Hi Ismael,

I think the ReadAll in the IO connector refers to the IO with SDF
implementation despite the type of input, where Read refers to
UnboundedSource.  One major pushback of using KafkaIO.Read as source
description is that not all configurations of KafkaIO.Read are meaningful
to populate during execution time. Also when thinking about x-lang useage,
making source description across language boundaries is also necessary.  As
Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
KafkaSourceDescription.java
<https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>.
Then the coder of this schema-aware object will be a SchemaCoder
<https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>.
When crossing language boundaries, it's also easy to convert a Row into the
source description: Convert.fromRows
<https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480>
.


On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com> wrote:

> To provide additional context, the KafkaIO ReadAll transform takes a
> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a POJO
> that contains the configurable parameters for reading from Kafka. This is
> different from the pattern that Ismael listed because they take
> PCollection<Read> as input and the Read is the same as the Read PTransform
> class used for the non read all case.
>
> The KafkaSourceDescriptor does lead to duplication since parameters used
> to configure the transform have to be copied over to the source descriptor
> but decouples how a transform is specified from the object that describes
> what needs to be done. I believe Ismael's point is that we wouldn't need
> such a decoupling.
>
> Another area that hasn't been discussed and I believe is a non-issue is
> that the Beam Java SDK has the most IO connectors and we would want to use
> the IO implementations within Beam Go and Beam Python. This brings in its
> own set of issues related to versioning and compatibility for the wire
> format and how one parameterizes such transforms. The wire format issue can
> be solved with either approach by making sure that the cross language
> expansion always takes the well known format (whatever it may be) and
> converts it into Read/KafkaSourceDescriptor/... object that is then passed
> to the ReadAll transform. Boyuan has been looking to make the
> KafkaSourceDescriptor have a schema so it can be represented as a row and
> this can be done easily using the AutoValue integration (I don't believe
> there is anything preventing someone from writing a schema row -> Read ->
> row adapter or also using the AutoValue configuration if the transform is
> also an AutoValue).
>
> I would be more for the code duplication and separation of concerns
> provided by using a different object to represent the contents of the
> PCollection from the pipeline construction time PTransform.
>
> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Hi Ismael,
>>
>> Thanks for taking this on. Have you considered an approach similar (or
>> dual) to FileIO.write(), where we in a sense also have to configure a
>> dynamic number different IO transforms of the same type (file writes)?
>>
>> E.g. how in this example we configure many aspects of many file writes:
>>
>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>      .by(Transaction::getType)
>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
>> written to CSVSink
>>           type -> new CSVSink(type.getFieldNames()))
>>      .to(".../path/to/")
>>      .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>>
>> we could do something similar for many JdbcIO reads:
>>
>> PCollection<Bar> bars;  // user-specific type from which all the read
>> parameters can be inferred
>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>   .fromQuery(bar -> ...compute query for this bar...)
>>   .withMapper((bar, resultSet) -> new Moo(...))
>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>   ...etc);
>>
>>
>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> (my excuses for the long email but this requires context)
>>>
>>> As part of the move from Source based IOs to DoFn based ones. One pattern
>>> emerged due to the composable nature of DoFn. The idea is to have a
>>> different
>>> kind of composable reads where we take a PCollection of different sorts
>>> of
>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>
>>> JdbcIO:
>>> ReadAll<ParameterT, OutputT> extends
>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>
>>> RedisIO:
>>> ReadAll extends PTransform<PCollection<String>, PCollection<KV<String,
>>> String>>>
>>>
>>> HBaseIO:
>>> ReadAll extends PTransform<PCollection<HBaseQuery>, PCollection<Result>>
>>>
>>> These patterns enabled richer use cases like doing multiple queries in
>>> the same
>>> Pipeline, querying based on key patterns or querying from multiple
>>> tables at the
>>> same time but came with some maintenance issues:
>>>
>>> - We ended up needing to add to the ReadAll transforms the parameters for
>>>   missing information so we ended up with lots of duplicated with
>>> methods and
>>>   error-prone code from the Read transforms into the ReadAll transforms.
>>>
>>> - When you require new parameters you have to expand the input
>>> parameters of the
>>>   intermediary specification into something that resembles the full
>>> `Read`
>>>   definition for example imagine you want to read from multiple tables or
>>>   servers as part of the same pipeline but this was not in the
>>> intermediate
>>>   specification you end up adding those extra methods (duplicating more
>>> code)
>>>   just o get close to the be like the Read full spec.
>>>
>>> - If new parameters are added to the Read method we end up adding them
>>>   systematically to the ReadAll transform too so they are taken into
>>> account.
>>>
>>> Due to these issues I recently did a change to test a new approach that
>>> is
>>> simpler, more complete and maintainable. The code became:
>>>
>>> HBaseIO:
>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>>>
>>> With this approach users gain benefits of improvements on parameters of
>>> normal
>>> Read because they count with the full Read parameters. But of course
>>> there are
>>> some minor caveats:
>>>
>>> 1. You need to push some information into normal Reads for example
>>>    partition boundaries information or Restriction information (in the
>>> SDF
>>>    case).  Notice that this consistent approach of ReadAll produces a
>>> simple
>>>    pattern that ends up being almost reusable between IOs (e.g. the
>>> non-SDF
>>>    case):
>>>
>>>   public static class ReadAll extends PTransform<PCollection<Read>,
>>> PCollection<SolrDocument>> {
>>>     @Override
>>>     public PCollection<SolrDocument> expand(PCollection<Read> input) {
>>>       return input
>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>     }
>>>   }
>>>
>>> 2. If you are using Generic types for the results ReadAll you must have
>>> the
>>>    Coders used in its definition and require consistent types from the
>>> data
>>>    sources, in practice this means we need to add extra withCoder
>>> method(s) on
>>>    ReadAll but not the full specs.
>>>
>>>
>>> At the moment HBaseIO and SolrIO already follow this ReadAll pattern.
>>> RedisIO
>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring this
>>> subject
>>> to the mailing list to see your opinions, and if you see any sort of
>>> issues that
>>> we might be missing with this idea.
>>>
>>> Also I would like to see if we have consensus to start using
>>> consistently the
>>> terminology of ReadAll transforms based on Read and the readAll() method
>>> for new
>>> IOs (at this point probably outdoing this in the only remaining
>>> inconsistent
>>> place in JdbcIO might not be a good idea but apart of this we should be
>>> ok).
>>>
>>> I mention this because the recent PR on KafkaIO based on SDF is doing
>>> something
>>> similar to the old pattern but being called ReadAll and maybe it is
>>> worth to be
>>> consistent for the benefit of users.
>>>
>>> Regards,
>>> Ismaël
>>>
>>

Reply via email to