Hi Ismael,

Thanks for taking this on. Have you considered an approach similar (or
dual) to FileIO.write(), where we in a sense also have to configure a
dynamic number different IO transforms of the same type (file writes)?

E.g. how in this example we configure many aspects of many file writes:

transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
     .by(Transaction::getType)
     .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
written to CSVSink
          type -> new CSVSink(type.getFieldNames()))
     .to(".../path/to/")
     .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));

we could do something similar for many JdbcIO reads:

PCollection<Bar> bars;  // user-specific type from which all the read
parameters can be inferred
PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
  .fromQuery(bar -> ...compute query for this bar...)
  .withMapper((bar, resultSet) -> new Moo(...))
  .withBatchSize(bar -> ...compute batch size for this bar...)
  ...etc);


On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> wrote:

> Hello,
>
> (my excuses for the long email but this requires context)
>
> As part of the move from Source based IOs to DoFn based ones. One pattern
> emerged due to the composable nature of DoFn. The idea is to have a
> different
> kind of composable reads where we take a PCollection of different sorts of
> intermediate specifications e.g. tables, queries, etc, for example:
>
> JdbcIO:
> ReadAll<ParameterT, OutputT> extends
> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>
> RedisIO:
> ReadAll extends PTransform<PCollection<String>, PCollection<KV<String,
> String>>>
>
> HBaseIO:
> ReadAll extends PTransform<PCollection<HBaseQuery>, PCollection<Result>>
>
> These patterns enabled richer use cases like doing multiple queries in the
> same
> Pipeline, querying based on key patterns or querying from multiple tables
> at the
> same time but came with some maintenance issues:
>
> - We ended up needing to add to the ReadAll transforms the parameters for
>   missing information so we ended up with lots of duplicated with methods
> and
>   error-prone code from the Read transforms into the ReadAll transforms.
>
> - When you require new parameters you have to expand the input parameters
> of the
>   intermediary specification into something that resembles the full `Read`
>   definition for example imagine you want to read from multiple tables or
>   servers as part of the same pipeline but this was not in the intermediate
>   specification you end up adding those extra methods (duplicating more
> code)
>   just o get close to the be like the Read full spec.
>
> - If new parameters are added to the Read method we end up adding them
>   systematically to the ReadAll transform too so they are taken into
> account.
>
> Due to these issues I recently did a change to test a new approach that is
> simpler, more complete and maintainable. The code became:
>
> HBaseIO:
> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>
> With this approach users gain benefits of improvements on parameters of
> normal
> Read because they count with the full Read parameters. But of course there
> are
> some minor caveats:
>
> 1. You need to push some information into normal Reads for example
>    partition boundaries information or Restriction information (in the SDF
>    case).  Notice that this consistent approach of ReadAll produces a
> simple
>    pattern that ends up being almost reusable between IOs (e.g. the
> non-SDF
>    case):
>
>   public static class ReadAll extends PTransform<PCollection<Read>,
> PCollection<SolrDocument>> {
>     @Override
>     public PCollection<SolrDocument> expand(PCollection<Read> input) {
>       return input
>           .apply("Split", ParDo.of(new SplitFn()))
>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>           .apply("Read", ParDo.of(new ReadFn()));
>     }
>   }
>
> 2. If you are using Generic types for the results ReadAll you must have the
>    Coders used in its definition and require consistent types from the data
>    sources, in practice this means we need to add extra withCoder
> method(s) on
>    ReadAll but not the full specs.
>
>
> At the moment HBaseIO and SolrIO already follow this ReadAll pattern.
> RedisIO
> and CassandraIO have already WIP PRs to do so. So I wanted to bring this
> subject
> to the mailing list to see your opinions, and if you see any sort of
> issues that
> we might be missing with this idea.
>
> Also I would like to see if we have consensus to start using consistently
> the
> terminology of ReadAll transforms based on Read and the readAll() method
> for new
> IOs (at this point probably outdoing this in the only remaining
> inconsistent
> place in JdbcIO might not be a good idea but apart of this we should be
> ok).
>
> I mention this because the recent PR on KafkaIO based on SDF is doing
> something
> similar to the old pattern but being called ReadAll and maybe it is worth
> to be
> consistent for the benefit of users.
>
> Regards,
> Ismaël
>

Reply via email to