Hi Ismael,
Thanks for taking this on. Have you considered an approach similar (or
dual) to FileIO.write(), where we in a sense also have to configure a
dynamic number different IO transforms of the same type (file writes)?
E.g. how in this example we configure many aspects of many file writes:
transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
.by(Transaction::getType)
.via(tx -> tx.getType().toFields(tx), // Convert the data to be
written to CSVSink
type -> new CSVSink(type.getFieldNames()))
.to(".../path/to/")
.withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
we could do something similar for many JdbcIO reads:
PCollection<Bar> bars; // user-specific type from which all the read
parameters can be inferred
PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
.fromQuery(bar -> ...compute query for this bar...)
.withMapper((bar, resultSet) -> new Moo(...))
.withBatchSize(bar -> ...compute batch size for this bar...)
...etc);
On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> wrote:
> Hello,
>
> (my excuses for the long email but this requires context)
>
> As part of the move from Source based IOs to DoFn based ones. One pattern
> emerged due to the composable nature of DoFn. The idea is to have a
> different
> kind of composable reads where we take a PCollection of different sorts of
> intermediate specifications e.g. tables, queries, etc, for example:
>
> JdbcIO:
> ReadAll<ParameterT, OutputT> extends
> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>
> RedisIO:
> ReadAll extends PTransform<PCollection<String>, PCollection<KV<String,
> String>>>
>
> HBaseIO:
> ReadAll extends PTransform<PCollection<HBaseQuery>, PCollection<Result>>
>
> These patterns enabled richer use cases like doing multiple queries in the
> same
> Pipeline, querying based on key patterns or querying from multiple tables
> at the
> same time but came with some maintenance issues:
>
> - We ended up needing to add to the ReadAll transforms the parameters for
> missing information so we ended up with lots of duplicated with methods
> and
> error-prone code from the Read transforms into the ReadAll transforms.
>
> - When you require new parameters you have to expand the input parameters
> of the
> intermediary specification into something that resembles the full `Read`
> definition for example imagine you want to read from multiple tables or
> servers as part of the same pipeline but this was not in the intermediate
> specification you end up adding those extra methods (duplicating more
> code)
> just o get close to the be like the Read full spec.
>
> - If new parameters are added to the Read method we end up adding them
> systematically to the ReadAll transform too so they are taken into
> account.
>
> Due to these issues I recently did a change to test a new approach that is
> simpler, more complete and maintainable. The code became:
>
> HBaseIO:
> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>
> With this approach users gain benefits of improvements on parameters of
> normal
> Read because they count with the full Read parameters. But of course there
> are
> some minor caveats:
>
> 1. You need to push some information into normal Reads for example
> partition boundaries information or Restriction information (in the SDF
> case). Notice that this consistent approach of ReadAll produces a
> simple
> pattern that ends up being almost reusable between IOs (e.g. the
> non-SDF
> case):
>
> public static class ReadAll extends PTransform<PCollection<Read>,
> PCollection<SolrDocument>> {
> @Override
> public PCollection<SolrDocument> expand(PCollection<Read> input) {
> return input
> .apply("Split", ParDo.of(new SplitFn()))
> .apply("Reshuffle", Reshuffle.viaRandomKey())
> .apply("Read", ParDo.of(new ReadFn()));
> }
> }
>
> 2. If you are using Generic types for the results ReadAll you must have the
> Coders used in its definition and require consistent types from the data
> sources, in practice this means we need to add extra withCoder
> method(s) on
> ReadAll but not the full specs.
>
>
> At the moment HBaseIO and SolrIO already follow this ReadAll pattern.
> RedisIO
> and CassandraIO have already WIP PRs to do so. So I wanted to bring this
> subject
> to the mailing list to see your opinions, and if you see any sort of
> issues that
> we might be missing with this idea.
>
> Also I would like to see if we have consensus to start using consistently
> the
> terminology of ReadAll transforms based on Read and the readAll() method
> for new
> IOs (at this point probably outdoing this in the only remaining
> inconsistent
> place in JdbcIO might not be a good idea but apart of this we should be
> ok).
>
> I mention this because the recent PR on KafkaIO based on SDF is doing
> something
> similar to the old pattern but being called ReadAll and maybe it is worth
> to be
> consistent for the benefit of users.
>
> Regards,
> Ismaël
>