We forgot to mention (5) External.Config used in cross-lang, see KafkaIO
ExternalTransformBuilder. This approach is the predecessor of (4) and probably a
really good candidate to be replaced by the Row based Configuration Boyuan is
envisioning (so good to be aware of this).

Thanks for the clear explanation Luke you mention the real issue(s). All the
approaches discussed so far in the end could be easily transformed to produce a
PCollection<Read> and those Read Elements could be read by the generic ReadAll
transform. Notice that this can be internal in some IOs e.g. KafkaIO if they
decide not to expose it. I am not saying that we should force every IO to
support ReadAll in its public API but if we do it is probably a good idea to be
consistent with naming the transform that expects an input PCollection<Read> in
the same way. Also notice that using it will save us of the maintenance issues
discussed in my previous email.

Back to the main concern: the consequences of expansion based on Read: So far I
have not seen consequences for the Splitting part which maps really nice
assuming the Partition info / Restriction is available as part of Read. So far
there are not Serialization because Beam is already enforcing this. Notice that
ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least for the
Bounded case (see the code in my previous email). For the other points:

> a) Do all properties set on a Read apply to the ReadAll? For example, the
> Kafka Read implementation allows you to set the key and value deserializers
> which are also used to dictate the output PCollection type. It also allows you
> to set how the watermark should be computed. Technically a user may want the
> watermark computation to be configurable per Read and they may also want an
> output type which is polymorphic (e.g. Pcollection<Serializable>).

Most of the times they do but for parametric types we cannot support different
types in the outputs of the Read or at least I did not find how to do so (is
there a way to use multiple output Coders on Beam?), we saw this in CassandraIO
and we were discussing adding explicitly these Coders or Serializer
specific methods to the ReadAll transform. This is less nice because it will
imply some repeated methods, but it is still a compromise to gain the other
advantages. I suppose the watermark case you mention is similar because you may
want the watermark to behave differently in each Read and we probably don’t
support this, so it corresponds to the polymorphic category.

> b) Read extends PTransform which brings its own object modelling concerns.

> During the implementations of ReadAll(PCollection<Read>), was it discovered
> that some properties became runtime errors or were ignored if they were set?
> If no, then the code deduplication is likely worth it because we also get a
> lot of javadoc deduplication, but if yes is this an acceptable user
> experience?

No, not so far. This is an interesting part, notice that the Read translation
ends up delegating the read bits to the ReadFn part of ReadAll so the ReadFn is
the real read and must be aware and use all the parameters.

    @Override
    public PCollection<SolrDocument> expand(PBegin input) {
      return input.apply("Create", Create.of(this)).apply("ReadAll", readAll());
    }

I might be missing something for the Unbounded SDF case which is the only case
we have not explored so far. I think one easy way to see the limitations would
be in the ongoing KafkaIO SDF based implementation to try to map
KafkaSourceDescriptor to do the extra PCollection<Read> and the Read logic on
the ReadAll with the SDF to see which constraints we hit, the polymorphic ones
will be there for sure, maybe others will appear (not sure). However it would be
interesting to see if we have a real gain in the maintenance points, but well
let’s not forget also that KafkaIO has a LOT of knobs so probably the generic
implementation could be relatively complex.



On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote:
>
> I had mentioned that approach 1 and approach 2 work for cross language. The 
> difference being that the cross language transform would take a well known 
> definition and convert it to the Read transform. A normal user would have a 
> pipeline that would look like:
> 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> PCollection<Output>
>
> And in the cross language case this would look like:
> 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to Read) -> 
> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to 
> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
> PCollection<Output>*
> * note that PTransform(Convert Row to SourceDescriptor) only exists since we 
> haven't solved how to use schemas with language bound types in a cross 
> language way. SchemaCoder isn't portable but RowCoder is which is why the 
> conversion step exists. We could have a solution for this at some point in 
> time.
>
> My concern with using Read was around:
> a) Do all properties set on a Read apply to the ReadAll? For example, the 
> Kafka Read implementation allows you to set the key and value deserializers 
> which are also used to dictate the output PCollection type. It also allows 
> you to set how the watermark should be computed. Technically a user may want 
> the watermark computation to be configurable per Read and they may also want 
> an output type which is polymorphic (e.g. PCollection<Serializable>).
> b) Read extends PTransform which brings its own object modelling concerns.
>
> During the implementations of ReadAll(PCollection<Read>), was it discovered 
> that some properties became runtime errors or were ignored if they were set? 
> If no, then the code deduplication is likely worth it because we also get a 
> lot of javadoc deduplication, but if yes is this an acceptable user 
> experience?
>
>
> On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <[email protected]> 
> wrote:
>>
>> I believe that the initial goal of unifying ReadAll as a general 
>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the 
>> amount of code duplication and error-prone approach related to this. It 
>> makes much sense since usually we have all needed configuration set in Read 
>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only 
>> Split-Shuffle-Read stages.  So this case usually can be unified by using 
>> PCollection<Read> as input.
>>
>> On the other hand, we have another need to use Java IOs as cross-language 
>> transforms (as Luke described) which seems only partly in common with 
>> previous pattern of ReadAll using.
>>
>> I’d be more in favour to have only one concept of read configuration for all 
>> needs but seems it’s not easy and I’d be more in favour with Luke and Boyuan 
>> approach with schema. Though, maybe ReadAll is not a very suitable name in 
>> this case because it will can bring some confusions related to previous 
>> pattern of ReadAll uses.
>>
>> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote:
>>
>> Sorry for the typo. I mean I think we can go with (3) and (4): use the data 
>> type that is schema-aware as the input of ReadAll.
>>
>> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> wrote:
>>>
>>> Thanks for the summary, Cham!
>>>
>>> I think we can go with (2) and (4): use the data type that is schema-aware 
>>> as the input of ReadAll.
>>>
>>> Converting Read into ReadAll helps us to stick with SDF-like IO. But only 
>>> having  (3) is not enough to solve the problem of using ReadAll in x-lang 
>>> case.
>>>
>>> The key point of ReadAll is that the input type of ReadAll should be able 
>>> to cross language boundaries and have compatibilities of 
>>> updating/downgrading. After investigating some possibilities(pure java pojo 
>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that 
>>> row/schema fits our needs most. Here comes (4). I believe that using Read 
>>> as input of ReadAll makes sense in some cases, but I also think not all IOs 
>>> have the same need. I would treat Read as a special type as long as the 
>>> Read is schema-aware.
>>>
>>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <[email protected]> 
>>> wrote:
>>>>
>>>> I see. So it seems like there are three options discussed so far when it 
>>>> comes to defining source descriptors for ReadAll type transforms
>>>>
>>>> (1) Use Read PTransform as the element type of the input PCollection
>>>> (2) Use a POJO that describes the source as the data element of the input 
>>>> PCollection
>>>> (3) Provide a converter as a function to the Read transform which 
>>>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>>>
>>>> I feel like (3) is more suitable for a related set of source descriptions 
>>>> such as files.
>>>> (1) will allow most code-reuse but seems like will make it hard to use the 
>>>> ReadAll transform as a cross-language transform and will break the 
>>>> separation of construction time and runtime constructs
>>>> (2) could result to less code reuse if not careful but will make the 
>>>> transform easier to be used as a cross-language transform without 
>>>> additional modifications
>>>>
>>>> Also, with SDF, we can create ReadAll-like transforms that are more 
>>>> efficient. So we might be able to just define all sources in that format 
>>>> and make Read transforms just an easy to use composite built on top of 
>>>> that (by adding a preceding Create transform).
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>> I believe we do require PTransforms to be serializable since anonymous 
>>>>> DoFns typically capture the enclosing PTransform.
>>>>>
>>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Seems like Read in PCollection<Read> refers to a transform, at least 
>>>>>> here: 
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>
>>>>>> I'm in favour of separating construction time transforms from execution 
>>>>>> time data objects that we store in PCollections as Luke mentioned. Also, 
>>>>>> we don't guarantee that PTransform is serializable so users have the 
>>>>>> additional complexity of providing a corder whenever a PTransform is 
>>>>>> used as a data object.
>>>>>> Also, agree with Boyuan that using simple Java objects that are 
>>>>>> convertible to Beam Rows allow us to make these transforms available to 
>>>>>> other SDKs through the cross-language transforms. Using transforms or 
>>>>>> complex sources as data objects will probably make this difficult.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]> wrote:
>>>>>>>
>>>>>>> Hi Ismael,
>>>>>>>
>>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF 
>>>>>>> implementation despite the type of input, where Read refers to 
>>>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source 
>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>> meaningful to populate during execution time. Also when thinking about 
>>>>>>> x-lang useage, making source description across language boundaries is 
>>>>>>> also necessary.  As Luke mentioned, it's quite easy to infer a Schema 
>>>>>>> from an AutoValue object: KafkaSourceDescription.java. Then the coder 
>>>>>>> of this schema-aware object will be a SchemaCoder. When crossing 
>>>>>>> language boundaries, it's also easy to convert a Row into the source 
>>>>>>> description: Convert.fromRows.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote:
>>>>>>>>
>>>>>>>> To provide additional context, the KafkaIO ReadAll transform takes a 
>>>>>>>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a 
>>>>>>>> POJO that contains the configurable parameters for reading from Kafka. 
>>>>>>>> This is different from the pattern that Ismael listed because they 
>>>>>>>> take PCollection<Read> as input and the Read is the same as the Read 
>>>>>>>> PTransform class used for the non read all case.
>>>>>>>>
>>>>>>>> The KafkaSourceDescriptor does lead to duplication since parameters 
>>>>>>>> used to configure the transform have to be copied over to the source 
>>>>>>>> descriptor but decouples how a transform is specified from the object 
>>>>>>>> that describes what needs to be done. I believe Ismael's point is that 
>>>>>>>> we wouldn't need such a decoupling.
>>>>>>>>
>>>>>>>> Another area that hasn't been discussed and I believe is a non-issue 
>>>>>>>> is that the Beam Java SDK has the most IO connectors and we would want 
>>>>>>>> to use the IO implementations within Beam Go and Beam Python. This 
>>>>>>>> brings in its own set of issues related to versioning and 
>>>>>>>> compatibility for the wire format and how one parameterizes such 
>>>>>>>> transforms. The wire format issue can be solved with either approach 
>>>>>>>> by making sure that the cross language expansion always takes the well 
>>>>>>>> known format (whatever it may be) and converts it into 
>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the 
>>>>>>>> ReadAll transform. Boyuan has been looking to make the 
>>>>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row 
>>>>>>>> and this can be done easily using the AutoValue integration (I don't 
>>>>>>>> believe there is anything preventing someone from writing a schema row 
>>>>>>>> -> Read -> row adapter or also using the AutoValue configuration if 
>>>>>>>> the transform is also an AutoValue).
>>>>>>>>
>>>>>>>> I would be more for the code duplication and separation of concerns 
>>>>>>>> provided by using a different object to represent the contents of the 
>>>>>>>> PCollection from the pipeline construction time PTransform.
>>>>>>>>
>>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>>>>>>>> <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>> Hi Ismael,
>>>>>>>>>
>>>>>>>>> Thanks for taking this on. Have you considered an approach similar 
>>>>>>>>> (or dual) to FileIO.write(), where we in a sense also have to 
>>>>>>>>> configure a dynamic number different IO transforms of the same type 
>>>>>>>>> (file writes)?
>>>>>>>>>
>>>>>>>>> E.g. how in this example we configure many aspects of many file 
>>>>>>>>> writes:
>>>>>>>>>
>>>>>>>>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be 
>>>>>>>>> written to CSVSink
>>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>      .to(".../path/to/")
>>>>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions", 
>>>>>>>>> ".csv"));
>>>>>>>>>
>>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>
>>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all the read 
>>>>>>>>> parameters can be inferred
>>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>>>>>>>>   ...etc);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> 
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> (my excuses for the long email but this requires context)
>>>>>>>>>>
>>>>>>>>>> As part of the move from Source based IOs to DoFn based ones. One 
>>>>>>>>>> pattern
>>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to have a 
>>>>>>>>>> different
>>>>>>>>>> kind of composable reads where we take a PCollection of different 
>>>>>>>>>> sorts of
>>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>>>>>>>>
>>>>>>>>>> JdbcIO:
>>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>>>>>>>
>>>>>>>>>> RedisIO:
>>>>>>>>>> ReadAll extends PTransform<PCollection<String>, 
>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>
>>>>>>>>>> HBaseIO:
>>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, 
>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>
>>>>>>>>>> These patterns enabled richer use cases like doing multiple queries 
>>>>>>>>>> in the same
>>>>>>>>>> Pipeline, querying based on key patterns or querying from multiple 
>>>>>>>>>> tables at the
>>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>
>>>>>>>>>> - We ended up needing to add to the ReadAll transforms the 
>>>>>>>>>> parameters for
>>>>>>>>>>   missing information so we ended up with lots of duplicated with 
>>>>>>>>>> methods and
>>>>>>>>>>   error-prone code from the Read transforms into the ReadAll 
>>>>>>>>>> transforms.
>>>>>>>>>>
>>>>>>>>>> - When you require new parameters you have to expand the input 
>>>>>>>>>> parameters of the
>>>>>>>>>>   intermediary specification into something that resembles the full 
>>>>>>>>>> `Read`
>>>>>>>>>>   definition for example imagine you want to read from multiple 
>>>>>>>>>> tables or
>>>>>>>>>>   servers as part of the same pipeline but this was not in the 
>>>>>>>>>> intermediate
>>>>>>>>>>   specification you end up adding those extra methods (duplicating 
>>>>>>>>>> more code)
>>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>>>>
>>>>>>>>>> - If new parameters are added to the Read method we end up adding 
>>>>>>>>>> them
>>>>>>>>>>   systematically to the ReadAll transform too so they are taken into 
>>>>>>>>>> account.
>>>>>>>>>>
>>>>>>>>>> Due to these issues I recently did a change to test a new approach 
>>>>>>>>>> that is
>>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>>>>>>>>
>>>>>>>>>> HBaseIO:
>>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>>>>>>>>>>
>>>>>>>>>> With this approach users gain benefits of improvements on parameters 
>>>>>>>>>> of normal
>>>>>>>>>> Read because they count with the full Read parameters. But of course 
>>>>>>>>>> there are
>>>>>>>>>> some minor caveats:
>>>>>>>>>>
>>>>>>>>>> 1. You need to push some information into normal Reads for example
>>>>>>>>>>    partition boundaries information or Restriction information (in 
>>>>>>>>>> the SDF
>>>>>>>>>>    case).  Notice that this consistent approach of ReadAll produces 
>>>>>>>>>> a simple
>>>>>>>>>>    pattern that ends up being almost reusable between IOs (e.g. the  
>>>>>>>>>>   non-SDF
>>>>>>>>>>    case):
>>>>>>>>>>
>>>>>>>>>>   public static class ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>     @Override
>>>>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read> input) 
>>>>>>>>>> {
>>>>>>>>>>       return input
>>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> 2. If you are using Generic types for the results ReadAll you must 
>>>>>>>>>> have the
>>>>>>>>>>    Coders used in its definition and require consistent types from 
>>>>>>>>>> the data
>>>>>>>>>>    sources, in practice this means we need to add extra withCoder 
>>>>>>>>>> method(s) on
>>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll 
>>>>>>>>>> pattern. RedisIO
>>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring 
>>>>>>>>>> this subject
>>>>>>>>>> to the mailing list to see your opinions, and if you see any sort of 
>>>>>>>>>> issues that
>>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>
>>>>>>>>>> Also I would like to see if we have consensus to start using 
>>>>>>>>>> consistently the
>>>>>>>>>> terminology of ReadAll transforms based on Read and the readAll() 
>>>>>>>>>> method for new
>>>>>>>>>> IOs (at this point probably outdoing this in the only remaining 
>>>>>>>>>> inconsistent
>>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we should 
>>>>>>>>>> be ok).
>>>>>>>>>>
>>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is 
>>>>>>>>>> doing something
>>>>>>>>>> similar to the old pattern but being called ReadAll and maybe it is 
>>>>>>>>>> worth to be
>>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Ismaël
>>
>>

On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote:
>
> I had mentioned that approach 1 and approach 2 work for cross language. The 
> difference being that the cross language transform would take a well known 
> definition and convert it to the Read transform. A normal user would have a 
> pipeline that would look like:
> 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> PCollection<Output>
>
> And in the cross language case this would look like:
> 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to Read) -> 
> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to 
> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
> PCollection<Output>*
> * note that PTransform(Convert Row to SourceDescriptor) only exists since we 
> haven't solved how to use schemas with language bound types in a cross 
> language way. SchemaCoder isn't portable but RowCoder is which is why the 
> conversion step exists. We could have a solution for this at some point in 
> time.
>
> My concern with using Read was around:
> a) Do all properties set on a Read apply to the ReadAll? For example, the 
> Kafka Read implementation allows you to set the key and value deserializers 
> which are also used to dictate the output PCollection type. It also allows 
> you to set how the watermark should be computed. Technically a user may want 
> the watermark computation to be configurable per Read and they may also want 
> an output type which is polymorphic (e.g. PCollection<Serializable>).
> b) Read extends PTransform which brings its own object modelling concerns.
>
> During the implementations of ReadAll(PCollection<Read>), was it discovered 
> that some properties became runtime errors or were ignored if they were set? 
> If no, then the code deduplication is likely worth it because we also get a 
> lot of javadoc deduplication, but if yes is this an acceptable user 
> experience?
>
>
> On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <[email protected]> 
> wrote:
>>
>> I believe that the initial goal of unifying ReadAll as a general 
>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the 
>> amount of code duplication and error-prone approach related to this. It 
>> makes much sense since usually we have all needed configuration set in Read 
>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only 
>> Split-Shuffle-Read stages.  So this case usually can be unified by using 
>> PCollection<Read> as input.
>>
>> On the other hand, we have another need to use Java IOs as cross-language 
>> transforms (as Luke described) which seems only partly in common with 
>> previous pattern of ReadAll using.
>>
>> I’d be more in favour to have only one concept of read configuration for all 
>> needs but seems it’s not easy and I’d be more in favour with Luke and Boyuan 
>> approach with schema. Though, maybe ReadAll is not a very suitable name in 
>> this case because it will can bring some confusions related to previous 
>> pattern of ReadAll uses.
>>
>> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote:
>>
>> Sorry for the typo. I mean I think we can go with (3) and (4): use the data 
>> type that is schema-aware as the input of ReadAll.
>>
>> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> wrote:
>>>
>>> Thanks for the summary, Cham!
>>>
>>> I think we can go with (2) and (4): use the data type that is schema-aware 
>>> as the input of ReadAll.
>>>
>>> Converting Read into ReadAll helps us to stick with SDF-like IO. But only 
>>> having  (3) is not enough to solve the problem of using ReadAll in x-lang 
>>> case.
>>>
>>> The key point of ReadAll is that the input type of ReadAll should be able 
>>> to cross language boundaries and have compatibilities of 
>>> updating/downgrading. After investigating some possibilities(pure java pojo 
>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that 
>>> row/schema fits our needs most. Here comes (4). I believe that using Read 
>>> as input of ReadAll makes sense in some cases, but I also think not all IOs 
>>> have the same need. I would treat Read as a special type as long as the 
>>> Read is schema-aware.
>>>
>>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <[email protected]> 
>>> wrote:
>>>>
>>>> I see. So it seems like there are three options discussed so far when it 
>>>> comes to defining source descriptors for ReadAll type transforms
>>>>
>>>> (1) Use Read PTransform as the element type of the input PCollection
>>>> (2) Use a POJO that describes the source as the data element of the input 
>>>> PCollection
>>>> (3) Provide a converter as a function to the Read transform which 
>>>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>>>
>>>> I feel like (3) is more suitable for a related set of source descriptions 
>>>> such as files.
>>>> (1) will allow most code-reuse but seems like will make it hard to use the 
>>>> ReadAll transform as a cross-language transform and will break the 
>>>> separation of construction time and runtime constructs
>>>> (2) could result to less code reuse if not careful but will make the 
>>>> transform easier to be used as a cross-language transform without 
>>>> additional modifications
>>>>
>>>> Also, with SDF, we can create ReadAll-like transforms that are more 
>>>> efficient. So we might be able to just define all sources in that format 
>>>> and make Read transforms just an easy to use composite built on top of 
>>>> that (by adding a preceding Create transform).
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>> I believe we do require PTransforms to be serializable since anonymous 
>>>>> DoFns typically capture the enclosing PTransform.
>>>>>
>>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Seems like Read in PCollection<Read> refers to a transform, at least 
>>>>>> here: 
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>
>>>>>> I'm in favour of separating construction time transforms from execution 
>>>>>> time data objects that we store in PCollections as Luke mentioned. Also, 
>>>>>> we don't guarantee that PTransform is serializable so users have the 
>>>>>> additional complexity of providing a corder whenever a PTransform is 
>>>>>> used as a data object.
>>>>>> Also, agree with Boyuan that using simple Java objects that are 
>>>>>> convertible to Beam Rows allow us to make these transforms available to 
>>>>>> other SDKs through the cross-language transforms. Using transforms or 
>>>>>> complex sources as data objects will probably make this difficult.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]> wrote:
>>>>>>>
>>>>>>> Hi Ismael,
>>>>>>>
>>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF 
>>>>>>> implementation despite the type of input, where Read refers to 
>>>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source 
>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>> meaningful to populate during execution time. Also when thinking about 
>>>>>>> x-lang useage, making source description across language boundaries is 
>>>>>>> also necessary.  As Luke mentioned, it's quite easy to infer a Schema 
>>>>>>> from an AutoValue object: KafkaSourceDescription.java. Then the coder 
>>>>>>> of this schema-aware object will be a SchemaCoder. When crossing 
>>>>>>> language boundaries, it's also easy to convert a Row into the source 
>>>>>>> description: Convert.fromRows.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote:
>>>>>>>>
>>>>>>>> To provide additional context, the KafkaIO ReadAll transform takes a 
>>>>>>>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a 
>>>>>>>> POJO that contains the configurable parameters for reading from Kafka. 
>>>>>>>> This is different from the pattern that Ismael listed because they 
>>>>>>>> take PCollection<Read> as input and the Read is the same as the Read 
>>>>>>>> PTransform class used for the non read all case.
>>>>>>>>
>>>>>>>> The KafkaSourceDescriptor does lead to duplication since parameters 
>>>>>>>> used to configure the transform have to be copied over to the source 
>>>>>>>> descriptor but decouples how a transform is specified from the object 
>>>>>>>> that describes what needs to be done. I believe Ismael's point is that 
>>>>>>>> we wouldn't need such a decoupling.
>>>>>>>>
>>>>>>>> Another area that hasn't been discussed and I believe is a non-issue 
>>>>>>>> is that the Beam Java SDK has the most IO connectors and we would want 
>>>>>>>> to use the IO implementations within Beam Go and Beam Python. This 
>>>>>>>> brings in its own set of issues related to versioning and 
>>>>>>>> compatibility for the wire format and how one parameterizes such 
>>>>>>>> transforms. The wire format issue can be solved with either approach 
>>>>>>>> by making sure that the cross language expansion always takes the well 
>>>>>>>> known format (whatever it may be) and converts it into 
>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the 
>>>>>>>> ReadAll transform. Boyuan has been looking to make the 
>>>>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row 
>>>>>>>> and this can be done easily using the AutoValue integration (I don't 
>>>>>>>> believe there is anything preventing someone from writing a schema row 
>>>>>>>> -> Read -> row adapter or also using the AutoValue configuration if 
>>>>>>>> the transform is also an AutoValue).
>>>>>>>>
>>>>>>>> I would be more for the code duplication and separation of concerns 
>>>>>>>> provided by using a different object to represent the contents of the 
>>>>>>>> PCollection from the pipeline construction time PTransform.
>>>>>>>>
>>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>>>>>>>> <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>> Hi Ismael,
>>>>>>>>>
>>>>>>>>> Thanks for taking this on. Have you considered an approach similar 
>>>>>>>>> (or dual) to FileIO.write(), where we in a sense also have to 
>>>>>>>>> configure a dynamic number different IO transforms of the same type 
>>>>>>>>> (file writes)?
>>>>>>>>>
>>>>>>>>> E.g. how in this example we configure many aspects of many file 
>>>>>>>>> writes:
>>>>>>>>>
>>>>>>>>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be 
>>>>>>>>> written to CSVSink
>>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>      .to(".../path/to/")
>>>>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions", 
>>>>>>>>> ".csv"));
>>>>>>>>>
>>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>
>>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all the read 
>>>>>>>>> parameters can be inferred
>>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>>>>>>>>   ...etc);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> 
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> (my excuses for the long email but this requires context)
>>>>>>>>>>
>>>>>>>>>> As part of the move from Source based IOs to DoFn based ones. One 
>>>>>>>>>> pattern
>>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to have a 
>>>>>>>>>> different
>>>>>>>>>> kind of composable reads where we take a PCollection of different 
>>>>>>>>>> sorts of
>>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>>>>>>>>
>>>>>>>>>> JdbcIO:
>>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>>>>>>>
>>>>>>>>>> RedisIO:
>>>>>>>>>> ReadAll extends PTransform<PCollection<String>, 
>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>
>>>>>>>>>> HBaseIO:
>>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, 
>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>
>>>>>>>>>> These patterns enabled richer use cases like doing multiple queries 
>>>>>>>>>> in the same
>>>>>>>>>> Pipeline, querying based on key patterns or querying from multiple 
>>>>>>>>>> tables at the
>>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>
>>>>>>>>>> - We ended up needing to add to the ReadAll transforms the 
>>>>>>>>>> parameters for
>>>>>>>>>>   missing information so we ended up with lots of duplicated with 
>>>>>>>>>> methods and
>>>>>>>>>>   error-prone code from the Read transforms into the ReadAll 
>>>>>>>>>> transforms.
>>>>>>>>>>
>>>>>>>>>> - When you require new parameters you have to expand the input 
>>>>>>>>>> parameters of the
>>>>>>>>>>   intermediary specification into something that resembles the full 
>>>>>>>>>> `Read`
>>>>>>>>>>   definition for example imagine you want to read from multiple 
>>>>>>>>>> tables or
>>>>>>>>>>   servers as part of the same pipeline but this was not in the 
>>>>>>>>>> intermediate
>>>>>>>>>>   specification you end up adding those extra methods (duplicating 
>>>>>>>>>> more code)
>>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>>>>
>>>>>>>>>> - If new parameters are added to the Read method we end up adding 
>>>>>>>>>> them
>>>>>>>>>>   systematically to the ReadAll transform too so they are taken into 
>>>>>>>>>> account.
>>>>>>>>>>
>>>>>>>>>> Due to these issues I recently did a change to test a new approach 
>>>>>>>>>> that is
>>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>>>>>>>>
>>>>>>>>>> HBaseIO:
>>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>>>>>>>>>>
>>>>>>>>>> With this approach users gain benefits of improvements on parameters 
>>>>>>>>>> of normal
>>>>>>>>>> Read because they count with the full Read parameters. But of course 
>>>>>>>>>> there are
>>>>>>>>>> some minor caveats:
>>>>>>>>>>
>>>>>>>>>> 1. You need to push some information into normal Reads for example
>>>>>>>>>>    partition boundaries information or Restriction information (in 
>>>>>>>>>> the SDF
>>>>>>>>>>    case).  Notice that this consistent approach of ReadAll produces 
>>>>>>>>>> a simple
>>>>>>>>>>    pattern that ends up being almost reusable between IOs (e.g. the  
>>>>>>>>>>   non-SDF
>>>>>>>>>>    case):
>>>>>>>>>>
>>>>>>>>>>   public static class ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>     @Override
>>>>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read> input) 
>>>>>>>>>> {
>>>>>>>>>>       return input
>>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> 2. If you are using Generic types for the results ReadAll you must 
>>>>>>>>>> have the
>>>>>>>>>>    Coders used in its definition and require consistent types from 
>>>>>>>>>> the data
>>>>>>>>>>    sources, in practice this means we need to add extra withCoder 
>>>>>>>>>> method(s) on
>>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll 
>>>>>>>>>> pattern. RedisIO
>>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring 
>>>>>>>>>> this subject
>>>>>>>>>> to the mailing list to see your opinions, and if you see any sort of 
>>>>>>>>>> issues that
>>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>
>>>>>>>>>> Also I would like to see if we have consensus to start using 
>>>>>>>>>> consistently the
>>>>>>>>>> terminology of ReadAll transforms based on Read and the readAll() 
>>>>>>>>>> method for new
>>>>>>>>>> IOs (at this point probably outdoing this in the only remaining 
>>>>>>>>>> inconsistent
>>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we should 
>>>>>>>>>> be ok).
>>>>>>>>>>
>>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is 
>>>>>>>>>> doing something
>>>>>>>>>> similar to the old pattern but being called ReadAll and maybe it is 
>>>>>>>>>> worth to be
>>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Ismaël
>>
>>

Reply via email to