Vincent, I will be out in the sense that I cannot really engage myself into more
activities because I have apart of your review two more pending + other work to
finish so I prefer not to add more work I cannot finish. I am still available
for the review however so let’s get this finally finished there. Thanks for
pointing this because it made me realize that I missed one important part in my
last message that was oriented towards a better solution but ignoring the
ongoing work.

Even if it has limitations it seems that the ReadAll based on PCollection<Read>
approach has clear benefits and Vincent’s use case is one more instance, so I
would like to propose that for the in-progress PRs we keep it as proposed until
we find a better solution. Notice however that in every case ReadAll is a new
feature and should be marked still as @Experimental so we can still improve it /
change course.

On Wed, Jul 15, 2020 at 1:02 AM Vincent Marquez
<[email protected]> wrote:
>
> Hi everyone, i've been working on 
> https://issues.apache.org/jira/browse/BEAM-9008 for quite some time now, 
> trying to contribute back the much improved Cassandra connector.  I 
> originally had been passing around a 'config' object to the readAll, but 
> after much discussion with Ismaël we decided it was best if I refactor to the 
> readAll taking a Read<A> as a parameter to be more uniform with some of the 
> other Connectors.
>
> I don't see a use case for using Read as a key itself, especially for the 
> CassandraIO's Read given it itself contains functions (Equality and functions 
> just seem like a weird thing to even try to deal with).
>
> For Ismaël and others, I would like to get this merged in sometime soon, as I 
> believe it has all of the requested functionality.  If Ismaël is leaving for 
> a month, is there someone else who could help me with this?
>
>
> ~Vincent
>
>
> On Tue, Jul 14, 2020 at 2:56 PM Ismaël Mejía <[email protected]> wrote:
>>
>> It has been really interesting to read all the perspectives in this thread 
>> and I
>> have even switched sides back and forth given the advantages / issues exposed
>> here, so it means we have clear pros/cons.
>>
>> One ‘not so nice‘ discovery related to this discussion for me was BEAM-10375 
>> [1]
>> tldr; Reads use java serialization so they don’t have a default deterministic
>> coder and if they are used as keys they break on GbK because Java’s
>> implementation requires keys to be deterministic [2] (is this the case in all
>> the other languages?). We can workaround this by having an alternative Coder 
>> for
>> Reads but somehow it does not feel natural and adds extra maintenance.
>>
>> I really like Kenn’s idea that we should rethink from scratch or write a
>> proposal of how we can have designed this with the present awareness about 
>> DoFn
>> based composition, code reuse and schema friendliness. Maybe worth to 
>> enumerate
>> what are the essentials we want to have (or not) first. I will be OOO for the
>> next month so I cannot actively work on this, but I will be interested on
>> reviewing/contributing in case someone wants to take the lead on a better
>> solution or we can in the meantime keep bringing ideas to this thread.
>>
>> Configuration based on functions translates hardly across languages so I 
>> wonder
>> if we should have also a mechanism to map those. Notice that an important use
>> case of this is the detailed configuration of clients for IOs which we have
>> started to expose in some IOs to avoid filling IOs API with ‘knobs‘ and 
>> better
>> let the user do their tuning by providing a client via a function.
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-10375
>> [2] 
>> https://github.com/apache/beam/blob/a9d70fed9069c4f4e9a12860ef711652f5f9c21a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java#L232-L237
>>
>> On Thu, Jul 9, 2020 at 5:52 AM Kenneth Knowles <[email protected]> wrote:
>> >
>> > If we are forced to create a fresh class due to a breaking change, let's 
>> > migrate to the "what we would do from scratch" approach, please.
>> >
>> > Kenn
>> >
>> > On Wed, Jul 8, 2020 at 5:15 PM Robert Bradshaw <[email protected]> wrote:
>> >>
>> >> OK, I'm +0 on this change. Using the PTransform as an element is
>> >> probably better than duplicating the full API on another interface,
>> >> and think it's worth getting this ublocked. This will require a Read2
>> >> if we have to add options in a upgrade-compatible way.
>> >>
>> >> On Tue, Jul 7, 2020 at 3:19 PM Luke Cwik <[email protected]> wrote:
>> >> >
>> >> > Robert, you're correct in your understanding that the Read PTransform 
>> >> > would be encoded via the schema coder.
>> >> >
>> >> > Kenn, different serializers are ok as long as the output coder can 
>> >> > encode/decode the output type. Different watermark fns are also ok 
>> >> > since it is about computing the watermark for each individual source 
>> >> > and won't impact the watermark computed by other sources. Watermark 
>> >> > advancement will still be held back by the source that is furthest 
>> >> > behind and still has the same problems when a user chooses a watermark 
>> >> > fn that was incompatible with the windowing strategy for producing 
>> >> > output (e.g. global window + default trigger + streaming pipeline).
>> >> >
>> >> > Both are pretty close so if we started from scratch then it could go 
>> >> > either way but we aren't starting from scratch (I don't think a Beam 
>> >> > 3.0 is likely to happen in the next few years as there isn't enough 
>> >> > stuff that we want to remove vs the amount of stuff we would gain).
>> >> >
>> >> > On Tue, Jul 7, 2020 at 2:57 PM Kenneth Knowles <[email protected]> wrote:
>> >> >>
>> >> >> On Tue, Jul 7, 2020 at 2:24 PM Robert Bradshaw <[email protected]> 
>> >> >> wrote:
>> >> >>>
>> >> >>> On Tue, Jul 7, 2020 at 2:06 PM Luke Cwik <[email protected]> wrote:
>> >> >>> >
>> >> >>> > Robert, the intent is that the Read object would use a schema coder 
>> >> >>> > and for XLang purposes would be no different then a POJO.
>> >> >>>
>> >> >>> Just to clarify, you're saying that the Read PTransform would be
>> >> >>> encoded via the schema coder? That still feels a bit odd (and
>> >> >>> specificically if we were designing IO from scratch rather than
>> >> >>> adapting to what already exists would we choose to use PTransforms as
>> >> >>> elements?) but would solve the cross language issue.
>> >> >>
>> >> >>
>> >> >> I like this question. If we were designing from scratch, what would we 
>> >> >> do? Would we encourage users to feed Create.of(SourceDescriptor) into 
>> >> >> ReadAll? We would probably provide a friendly wrapper for reading one 
>> >> >> static thing, and call it Read. But it would probably have an API like 
>> >> >> Read.from(SourceDescriptor), thus eliminating duplicate documentation 
>> >> >> and boilerplate that Luke described while keeping the separation that 
>> >> >> Brian described and clarity around xlang environments. But I'm +0 on 
>> >> >> whatever has momentum. I think the main downside is the weirdness 
>> >> >> around serializers/watermarkFn/etc on Read. I am not sure how much 
>> >> >> this will cause users problems. It would be very ambitious of them to 
>> >> >> produce a PCollection<Read> where they had different fns per element...
>> >> >>
>> >> >> Kenn
>> >> >>
>> >> >>>
>> >> >>> > The issue of how to deal with closures applies to both equally and 
>> >> >>> > that is why I suggested to favor using data over closures. Once 
>> >> >>> > there is an implementation for how to deal with UDFs in an XLang 
>> >> >>> > world, this guidance can change.
>> >> >>> >
>> >> >>> > Kenn, I did mean specifying an enum that the XLang expansion 
>> >> >>> > service would return a serialized blob of code. The XLang expansion 
>> >> >>> > service is responsible for returning an environment that contains 
>> >> >>> > all the necessary dependencies to execute the transforms and the 
>> >> >>> > serialized blob of code and hence would be a non-issue for the 
>> >> >>> > caller.
>> >> >>> >
>> >> >>> > From reviewing the SDF Kafka PR, the reduction in maintenance is 
>> >> >>> > definitely there (100s of lines of duplicated boilerplate and 
>> >> >>> > documentation).
>> >> >>> >
>> >> >>> > What are the next steps to get a resolution on this?
>> >> >>> >
>> >> >>> > On Thu, Jul 2, 2020 at 10:38 AM Robert Bradshaw 
>> >> >>> > <[email protected]> wrote:
>> >> >>> >>
>> >> >>> >> On Thu, Jul 2, 2020 at 10:26 AM Kenneth Knowles <[email protected]> 
>> >> >>> >> wrote:
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> On Wed, Jul 1, 2020 at 4:17 PM Eugene Kirpichov 
>> >> >>> >>> <[email protected]> wrote:
>> >> >>> >>>>
>> >> >>> >>>> Kenn - I don't mean an enum of common closures, I mean 
>> >> >>> >>>> expressing closures in a restricted sub-language such as the 
>> >> >>> >>>> language of SQL expressions.
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> My lack of clarity: enums was my phrasing of Luke's item 1). I 
>> >> >>> >>> understood what you meant. I think either a set of well-known 
>> >> >>> >>> closures or a tiny sublanguage could add value.
>> >> >>> >>>
>> >> >>> >>>>
>> >> >>> >>>> That would only work if there is a portable way to interpret SQL 
>> >> >>> >>>> expressions, but if there isn't, maybe there should be - for the 
>> >> >>> >>>> sake of, well, expressing closures portably. Of course these 
>> >> >>> >>>> would be closures that only work with rows - but that seems 
>> >> >>> >>>> powerful enough for many if not most purposes.
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> You can choose a SQL dialect or choose the tiniest subset just 
>> >> >>> >>> for this purpose and go with it. But when the data type going in 
>> >> >>> >>> or out of the lambda are e.g. some Java or Python object then 
>> >> >>> >>> what? One idea is to always require these to be rows. But if you 
>> >> >>> >>> can really get away with a dependency-free context-free lambda, 
>> >> >>> >>> then Javascript or Python is as doable as SQL in terms of having 
>> >> >>> >>> a tiny restricted language for just this purpose. I would expect 
>> >> >>> >>> once it got used, folks would start to ask to include the rest of 
>> >> >>> >>> what the language has to offer - its ecosystem. This is always 
>> >> >>> >>> the main design point I am interested in for "lightweight" 
>> >> >>> >>> embedded UDF proposals.
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> This is getting off the topic of ReadAll, but I think being able 
>> >> >>> >> to do arbitrary computation in preceding/succeeding transform plus 
>> >> >>> >> a (quite) restricted language in the transform itself can go a 
>> >> >>> >> long way. (For example, one could have a dynamic destinations 
>> >> >>> >> write that takes a KV<element, dest> where dest is a format string 
>> >> >>> >> like "foo-{{shard_num}}.txt" to plug in the truly dynamic pieces, 
>> >> >>> >> but the dest string itself can be computed (e.g. based on the 
>> >> >>> >> element) using arbitrary code in the caller language.)
>> >> >>> >>
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> Kenn
>> >> >>> >>>
>> >> >>> >>>>
>> >> >>> >>>> For example, maybe the Java example:
>> >> >>> >>>>
>> >> >>> >>>>  PCollection<BankTransaction> transactions = ...;
>> >> >>> >>>>  transactions.apply(FileIO.<TransactionType, 
>> >> >>> >>>> Transaction>writeDynamic()
>> >> >>> >>>>      .by(Transaction::getType)
>> >> >>> >>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data 
>> >> >>> >>>> to be written to CSVSink
>> >> >>> >>>>           type -> new CSVSink(type.getFieldNames()))
>> >> >>> >>>>      .to(".../path/to/")
>> >> >>> >>>>      .withNaming(type -> defaultNaming(type + "-transactions", 
>> >> >>> >>>> ".csv"));
>> >> >>> >>>>
>> >> >>> >>>> could be written in Python as:
>> >> >>> >>>>
>> >> >>> >>>> transactions | fileio.write_dynamic(
>> >> >>> >>>>   by="it.type",  # "it" is implicitly available in these SQL 
>> >> >>> >>>> expressions as the same thing as the Java lambda argument
>> >> >>> >>>>   format="it.fields",
>> >> >>> >>>>   sink="CSV_SINK(it.type.field_names)",  # A bunch of preset 
>> >> >>> >>>> sinks supported in every language?
>> >> >>> >>>>   to=".../path/to/",
>> >> >>> >>>>   naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')")
>> >> >>> >>>>
>> >> >>> >>>> Again, to be clear, I'm not suggesting to block what Ismael is 
>> >> >>> >>>> proposing on getting this done - getting this done wouldn't be a 
>> >> >>> >>>> short term effort, but seems potentially really nice.
>> >> >>> >>>>
>> >> >>> >>>>
>> >> >>> >>>> On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <[email protected]> 
>> >> >>> >>>> wrote:
>> >> >>> >>>>>
>> >> >>> >>>>> From the Go side of the table, the Go language doesn't provide 
>> >> >>> >>>>> a mechanism to serialize or access closure data, which means 
>> >> >>> >>>>> DoFns can't be functional closures.This combined with the move 
>> >> >>> >>>>> to have the "Structural DoFns" be serialized using Beam 
>> >> >>> >>>>> Schemas, has the net result that if Go transforms are used for 
>> >> >>> >>>>> Cross Language, they will be configurable with a Schema of the 
>> >> >>> >>>>> configuration data.
>> >> >>> >>>>>
>> >> >>> >>>>> Of course, this just means that each language will probably 
>> >> >>> >>>>> provide whichever mechanisms it likes for use of it's cross 
>> >> >>> >>>>> language transforms.
>> >> >>> >>>>>
>> >> >>> >>>>> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <[email protected]> 
>> >> >>> >>>>> wrote:
>> >> >>> >>>>>>
>> >> >>> >>>>>> I don't think an enum of most common closures will work. The 
>> >> >>> >>>>>> input types are typically generics that are made concrete by 
>> >> >>> >>>>>> the caller who also provides the closures. I think Luke's (2) 
>> >> >>> >>>>>> is the same idea as my "Java still assembles it [using opaque 
>> >> >>> >>>>>> Python closures/transforms]". It seems like an approach to 
>> >> >>> >>>>>> (3). Passing over actual code could address some cases, but 
>> >> >>> >>>>>> libraries become the issue.
>> >> >>> >>>>>>
>> >> >>> >>>>>> I think it is fair to say that "WriteAll" style would involve 
>> >> >>> >>>>>> entering unexplored territory.
>> >> >>> >>>>>>
>> >> >>> >>>>>> On the main topic, I think Brian has a pretty strong point and 
>> >> >>> >>>>>> his example of type conversion lambdas is a good example. I 
>> >> >>> >>>>>> did a quick survey and every other property I could find does 
>> >> >>> >>>>>> seem like it fits on the Read, and most IOs have a few of 
>> >> >>> >>>>>> these closures for example also extracting timestamps. So 
>> >> >>> >>>>>> maybe just a resolution convention of putting them on the 
>> >> >>> >>>>>> ReadAll and that taking precedence. Then you would be 
>> >> >>> >>>>>> deserializing a Read transform with insta-crash methods or 
>> >> >>> >>>>>> some such?
>> >> >>> >>>>>>
>> >> >>> >>>>>> Kenn
>> >> >>> >>>>>>
>> >> >>> >>>>>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov 
>> >> >>> >>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> Yeah, mainly I just feel like dynamic reads and dynamic 
>> >> >>> >>>>>>> writes (and perhaps not-yet-invented similar transforms of 
>> >> >>> >>>>>>> other kinds) are tightly related - they are either very 
>> >> >>> >>>>>>> similar, or are duals of each other - so they should use the 
>> >> >>> >>>>>>> same approach. If they are using different approaches, it is 
>> >> >>> >>>>>>> a sign that either one of them is being done wrong or that we 
>> >> >>> >>>>>>> are running into a fundamental limitation of Beam (e.g. 
>> >> >>> >>>>>>> difficulty of encoding closures compared to encoding 
>> >> >>> >>>>>>> elements).
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> But I agree with Luke that we shouldn't give up on closures. 
>> >> >>> >>>>>>> Especially with the work that has been done on schemas and 
>> >> >>> >>>>>>> SQL, I see no reason why we couldn't express closures in a 
>> >> >>> >>>>>>> portable restricted sub-language. If we can express SQL, we 
>> >> >>> >>>>>>> can express many or most use cases of dynamic reads/writes - 
>> >> >>> >>>>>>> I don't mean that we should actually use SQL (though we could 
>> >> >>> >>>>>>> - e.g. SQL scalar expressions seem powerful enough to express 
>> >> >>> >>>>>>> the closures appearing in most use cases of 
>> >> >>> >>>>>>> FileIO.writeDynamic), I just mean that SQL is an existence 
>> >> >>> >>>>>>> proof.
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> (I don't want to rock the boat too much, just thought I'd 
>> >> >>> >>>>>>> chime in as this topic is dear to my heart)
>> >> >>> >>>>>>>
>> >> >>> >>>>>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> 
>> >> >>> >>>>>>> wrote:
>> >> >>> >>>>>>>>
>> >> >>> >>>>>>>> Kenn, I'm not too worried about closures since:
>> >> >>> >>>>>>>> 1) the expansion service for a transform could have a well 
>> >> >>> >>>>>>>> set of defined closures by name that are returned as 
>> >> >>> >>>>>>>> serialized objects that don't need to be interpretable by 
>> >> >>> >>>>>>>> the caller
>> >> >>> >>>>>>>> 2) the language could store serialized functions of another 
>> >> >>> >>>>>>>> language as constants
>> >> >>> >>>>>>>> 3) generic XLang function support will eventually be needed
>> >> >>> >>>>>>>> but I do agree that closures do make things difficult to 
>> >> >>> >>>>>>>> express vs data which is why primarily why we should prefer 
>> >> >>> >>>>>>>> data over closures when possible and use closures when 
>> >> >>> >>>>>>>> expressing it with data would be too cumbersome.
>> >> >>> >>>>>>>>
>> >> >>> >>>>>>>> Brian, so far the cases that have been migrated have shown 
>> >> >>> >>>>>>>> that the source descriptor and the Read transform are almost 
>> >> >>> >>>>>>>> the same (some parameters that only impact pipeline 
>> >> >>> >>>>>>>> construction such as coders differ).
>> >> >>> >>>>>>>>
>> >> >>> >>>>>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette 
>> >> >>> >>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>
>> >> >>> >>>>>>>>> Sorry for jumping into this late and casting a vote against 
>> >> >>> >>>>>>>>> the consensus... but I think I'd prefer standardizing on a 
>> >> >>> >>>>>>>>> pattern like PCollection<KafkaSourceDescriptor> rather than 
>> >> >>> >>>>>>>>> PCollection<Read>. That approach clearly separates the 
>> >> >>> >>>>>>>>> parameters that are allowed to vary across a ReadAll (the 
>> >> >>> >>>>>>>>> ones defined in KafkaSourceDescriptor) from the parameters 
>> >> >>> >>>>>>>>> that should be constant (other parameters in the Read 
>> >> >>> >>>>>>>>> object, like SerializedFunctions for type conversions, 
>> >> >>> >>>>>>>>> parameters for different operating modes, etc...). I think 
>> >> >>> >>>>>>>>> it's helpful to think of the parameters that are allowed to 
>> >> >>> >>>>>>>>> vary as some "location descriptor", but I imagine IO 
>> >> >>> >>>>>>>>> authors may want other parameters to vary across a ReadAll 
>> >> >>> >>>>>>>>> as well.
>> >> >>> >>>>>>>>>
>> >> >>> >>>>>>>>> To me it seems safer to let an IO author "opt-in" to a 
>> >> >>> >>>>>>>>> parameter being dynamic at execution time.
>> >> >>> >>>>>>>>>
>> >> >>> >>>>>>>>> Brian
>> >> >>> >>>>>>>>>
>> >> >>> >>>>>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov 
>> >> >>> >>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>
>> >> >>> >>>>>>>>>> I'd like to raise one more time the question of 
>> >> >>> >>>>>>>>>> consistency between dynamic reads and dynamic writes, per 
>> >> >>> >>>>>>>>>> my email at the beginning of the thread.
>> >> >>> >>>>>>>>>> If the community prefers ReadAll to read from Read, then 
>> >> >>> >>>>>>>>>> should dynamicWrite's write to Write?
>> >> >>> >>>>>>>>>>
>> >> >>> >>>>>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang 
>> >> >>> >>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>
>> >> >>> >>>>>>>>>>> It seems like most of us agree on the idea that ReadAll 
>> >> >>> >>>>>>>>>>> should read from Read. I'm going to update the Kafka 
>> >> >>> >>>>>>>>>>> ReadAll with the same pattern.
>> >> >>> >>>>>>>>>>> Thanks for all your help!
>> >> >>> >>>>>>>>>>>
>> >> >>> >>>>>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath 
>> >> >>> >>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik 
>> >> >>> >>>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>> I would also like to suggest that transforms that 
>> >> >>> >>>>>>>>>>>>> implement ReadAll via Read should also provide methods 
>> >> >>> >>>>>>>>>>>>> like:
>> >> >>> >>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>> // Uses the specified values if unspecified in the 
>> >> >>> >>>>>>>>>>>>> input element from the PCollection<Read>.
>> >> >>> >>>>>>>>>>>>> withDefaults(Read read);
>> >> >>> >>>>>>>>>>>>> // Uses the specified values regardless of what the 
>> >> >>> >>>>>>>>>>>>> input element from the PCollection<Read> specifies.
>> >> >>> >>>>>>>>>>>>> withOverrides(Read read);
>> >> >>> >>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>> and only adds methods that are required at construction 
>> >> >>> >>>>>>>>>>>>> time (e.g. coders). This way the majority of 
>> >> >>> >>>>>>>>>>>>> documentation sits on the Read transform.
>> >> >>> >>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks 
>> >> >>> >>>>>>>>>>>> here and some of the drawbacks related to cross-language 
>> >> >>> >>>>>>>>>>>> can be overcome through future advancements.
>> >> >>> >>>>>>>>>>>> Thanks for bringing this up Ismaël.
>> >> >>> >>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>> - Cham
>> >> >>> >>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik 
>> >> >>> >>>>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>> Ismael, it is good to hear that using Read as the 
>> >> >>> >>>>>>>>>>>>>> input didn't have a bunch of parameters that were 
>> >> >>> >>>>>>>>>>>>>> being skipped/ignored. Also, for the polymorphism 
>> >> >>> >>>>>>>>>>>>>> issue you have to rely on the user correctly telling 
>> >> >>> >>>>>>>>>>>>>> you the type in such a way where it is a common 
>> >> >>> >>>>>>>>>>>>>> ancestor of all the runtime types that will ever be 
>> >> >>> >>>>>>>>>>>>>> used. This usually boils down to something like 
>> >> >>> >>>>>>>>>>>>>> Serializable or DynamicMessage such that the coder 
>> >> >>> >>>>>>>>>>>>>> that is chosen works for all the runtime types. Using 
>> >> >>> >>>>>>>>>>>>>> multiple types is a valid use case and would allow for 
>> >> >>> >>>>>>>>>>>>>> a simpler graph with less flattens merging the output 
>> >> >>> >>>>>>>>>>>>>> from multiple sources.
>> >> >>> >>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for 
>> >> >>> >>>>>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the 
>> >> >>> >>>>>>>>>>>>>> parameters can't be represented in a meaningful way 
>> >> >>> >>>>>>>>>>>>>> beyond "bytes". This would be helpful for cross 
>> >> >>> >>>>>>>>>>>>>> language as well since every parameter would become 
>> >> >>> >>>>>>>>>>>>>> available if a language could support it (e.g. it 
>> >> >>> >>>>>>>>>>>>>> could serialize a java function up front and keep it 
>> >> >>> >>>>>>>>>>>>>> saved as raw bytes within said language). Even if we 
>> >> >>> >>>>>>>>>>>>>> figure out a better way to do this in the future, 
>> >> >>> >>>>>>>>>>>>>> we'll have to change the schema for the new way 
>> >> >>> >>>>>>>>>>>>>> anyway. This would mean that the external version of 
>> >> >>> >>>>>>>>>>>>>> the transform adopts Row to Read and we drop 
>> >> >>> >>>>>>>>>>>>>> KafkaSourceDescriptor. The conversion from Row to Read 
>> >> >>> >>>>>>>>>>>>>> could validate that the parameters make sense (e.g. 
>> >> >>> >>>>>>>>>>>>>> the bytes are valid serialized functions). The 
>> >> >>> >>>>>>>>>>>>>> addition of an endReadTime/endReadOffset would make 
>> >> >>> >>>>>>>>>>>>>> sense for KafkaIO.Read as well and this would enable 
>> >> >>> >>>>>>>>>>>>>> having a bounded version that could be used for 
>> >> >>> >>>>>>>>>>>>>> backfills (this doesn't have to be done as part of any 
>> >> >>> >>>>>>>>>>>>>> current ongoing PR). Essentially any parameter that 
>> >> >>> >>>>>>>>>>>>>> could be added for a single instance of a Kafka 
>> >> >>> >>>>>>>>>>>>>> element+restriction would also make sense to the 
>> >> >>> >>>>>>>>>>>>>> KafkaIO.Read transform since it too is a single 
>> >> >>> >>>>>>>>>>>>>> instance. There are parameters that would apply to the 
>> >> >>> >>>>>>>>>>>>>> ReadAll that wouldn't apply to a read and these would 
>> >> >>> >>>>>>>>>>>>>> be global parameters across all element+restriction 
>> >> >>> >>>>>>>>>>>>>> pairs such as config overrides or default values.
>> >> >>> >>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>> I am convinced that we should do as Ismael is 
>> >> >>> >>>>>>>>>>>>>> suggesting and use KafkaIO.Read as the type.
>> >> >>> >>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath 
>> >> >>> >>>>>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>> Discussion regarding cross-language transforms is a 
>> >> >>> >>>>>>>>>>>>>>> slight tangent here. But I think, in general, it's 
>> >> >>> >>>>>>>>>>>>>>> great if we can use existing transforms (for example, 
>> >> >>> >>>>>>>>>>>>>>> IO connectors) as cross-language transforms without 
>> >> >>> >>>>>>>>>>>>>>> having to build more composites (irrespective of 
>> >> >>> >>>>>>>>>>>>>>> whether in ExternalTransformBuilders or a user 
>> >> >>> >>>>>>>>>>>>>>> pipelines) just to make them cross-language 
>> >> >>> >>>>>>>>>>>>>>> compatible. A future cross-language compatible 
>> >> >>> >>>>>>>>>>>>>>> SchemaCoder might help (assuming that works for Read 
>> >> >>> >>>>>>>>>>>>>>> transform) but I'm not sure we have a good idea when 
>> >> >>> >>>>>>>>>>>>>>> we'll get to that state.
>> >> >>> >>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>> Thanks,
>> >> >>> >>>>>>>>>>>>>>> Cham
>> >> >>> >>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang 
>> >> >>> >>>>>>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the 
>> >> >>> >>>>>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline 
>> >> >>> >>>>>>>>>>>>>>>> update scenario(For detailed discussion, please 
>> >> >>> >>>>>>>>>>>>>>>> refer to 
>> >> >>> >>>>>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>> >> >>> >>>>>>>>>>>>>>>>  In order to obtain the compatibility, it requires 
>> >> >>> >>>>>>>>>>>>>>>> the input of the read SDF is schema-aware.
>> >> >>> >>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>> Thus the major constraint of mapping 
>> >> >>> >>>>>>>>>>>>>>>> KafkaSourceDescriptor to PCollection<Read> is, the 
>> >> >>> >>>>>>>>>>>>>>>> KafkaIO.Read also needs to be schema-aware, 
>> >> >>> >>>>>>>>>>>>>>>> otherwise pipeline updates might fail unnecessarily. 
>> >> >>> >>>>>>>>>>>>>>>> If looking into KafkaIO.Read, not all necessary 
>> >> >>> >>>>>>>>>>>>>>>> fields are compatible with schema, for example, 
>> >> >>> >>>>>>>>>>>>>>>> SerializedFunction.
>> >> >>> >>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> 
>> >> >>> >>>>>>>>>>>>>>>> is a common pattern for SDF based IO. The Read can 
>> >> >>> >>>>>>>>>>>>>>>> be a common pattern because the input is always a 
>> >> >>> >>>>>>>>>>>>>>>> PBegin. But for an SDF based IO, the input can be 
>> >> >>> >>>>>>>>>>>>>>>> anything. By using Read as input, we will still have 
>> >> >>> >>>>>>>>>>>>>>>> the maintenance cost when SDF IO supports a new 
>> >> >>> >>>>>>>>>>>>>>>> field but Read doesn't consume it. For example, we 
>> >> >>> >>>>>>>>>>>>>>>> are discussing adding endOffset and endReadTime to 
>> >> >>> >>>>>>>>>>>>>>>> KafkaSourceDescriptior, which is not used in 
>> >> >>> >>>>>>>>>>>>>>>> KafkaIO.Read.
>> >> >>> >>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía 
>> >> >>> >>>>>>>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> We forgot to mention (5) External.Config used in 
>> >> >>> >>>>>>>>>>>>>>>>> cross-lang, see KafkaIO
>> >> >>> >>>>>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the 
>> >> >>> >>>>>>>>>>>>>>>>> predecessor of (4) and probably a
>> >> >>> >>>>>>>>>>>>>>>>> really good candidate to be replaced by the Row 
>> >> >>> >>>>>>>>>>>>>>>>> based Configuration Boyuan is
>> >> >>> >>>>>>>>>>>>>>>>> envisioning (so good to be aware of this).
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention 
>> >> >>> >>>>>>>>>>>>>>>>> the real issue(s). All the
>> >> >>> >>>>>>>>>>>>>>>>> approaches discussed so far in the end could be 
>> >> >>> >>>>>>>>>>>>>>>>> easily transformed to produce a
>> >> >>> >>>>>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be 
>> >> >>> >>>>>>>>>>>>>>>>> read by the generic ReadAll
>> >> >>> >>>>>>>>>>>>>>>>> transform. Notice that this can be internal in some 
>> >> >>> >>>>>>>>>>>>>>>>> IOs e.g. KafkaIO if they
>> >> >>> >>>>>>>>>>>>>>>>> decide not to expose it. I am not saying that we 
>> >> >>> >>>>>>>>>>>>>>>>> should force every IO to
>> >> >>> >>>>>>>>>>>>>>>>> support ReadAll in its public API but if we do it 
>> >> >>> >>>>>>>>>>>>>>>>> is probably a good idea to be
>> >> >>> >>>>>>>>>>>>>>>>> consistent with naming the transform that expects 
>> >> >>> >>>>>>>>>>>>>>>>> an input PCollection<Read> in
>> >> >>> >>>>>>>>>>>>>>>>> the same way. Also notice that using it will save 
>> >> >>> >>>>>>>>>>>>>>>>> us of the maintenance issues
>> >> >>> >>>>>>>>>>>>>>>>> discussed in my previous email.
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> Back to the main concern: the consequences of 
>> >> >>> >>>>>>>>>>>>>>>>> expansion based on Read: So far I
>> >> >>> >>>>>>>>>>>>>>>>> have not seen consequences for the Splitting part 
>> >> >>> >>>>>>>>>>>>>>>>> which maps really nice
>> >> >>> >>>>>>>>>>>>>>>>> assuming the Partition info / Restriction is 
>> >> >>> >>>>>>>>>>>>>>>>> available as part of Read. So far
>> >> >>> >>>>>>>>>>>>>>>>> there are not Serialization because Beam is already 
>> >> >>> >>>>>>>>>>>>>>>>> enforcing this. Notice that
>> >> >>> >>>>>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor 
>> >> >>> >>>>>>>>>>>>>>>>> man SDF at least for the
>> >> >>> >>>>>>>>>>>>>>>>> Bounded case (see the code in my previous email). 
>> >> >>> >>>>>>>>>>>>>>>>> For the other points:
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the 
>> >> >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the
>> >> >>> >>>>>>>>>>>>>>>>> > Kafka Read implementation allows you to set the 
>> >> >>> >>>>>>>>>>>>>>>>> > key and value deserializers
>> >> >>> >>>>>>>>>>>>>>>>> > which are also used to dictate the output 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection type. It also allows you
>> >> >>> >>>>>>>>>>>>>>>>> > to set how the watermark should be computed. 
>> >> >>> >>>>>>>>>>>>>>>>> > Technically a user may want the
>> >> >>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read 
>> >> >>> >>>>>>>>>>>>>>>>> > and they may also want an
>> >> >>> >>>>>>>>>>>>>>>>> > output type which is polymorphic (e.g. 
>> >> >>> >>>>>>>>>>>>>>>>> > Pcollection<Serializable>).
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> Most of the times they do but for parametric types 
>> >> >>> >>>>>>>>>>>>>>>>> we cannot support different
>> >> >>> >>>>>>>>>>>>>>>>> types in the outputs of the Read or at least I did 
>> >> >>> >>>>>>>>>>>>>>>>> not find how to do so (is
>> >> >>> >>>>>>>>>>>>>>>>> there a way to use multiple output Coders on 
>> >> >>> >>>>>>>>>>>>>>>>> Beam?), we saw this in CassandraIO
>> >> >>> >>>>>>>>>>>>>>>>> and we were discussing adding explicitly these 
>> >> >>> >>>>>>>>>>>>>>>>> Coders or Serializer
>> >> >>> >>>>>>>>>>>>>>>>> specific methods to the ReadAll transform. This is 
>> >> >>> >>>>>>>>>>>>>>>>> less nice because it will
>> >> >>> >>>>>>>>>>>>>>>>> imply some repeated methods, but it is still a 
>> >> >>> >>>>>>>>>>>>>>>>> compromise to gain the other
>> >> >>> >>>>>>>>>>>>>>>>> advantages. I suppose the watermark case you 
>> >> >>> >>>>>>>>>>>>>>>>> mention is similar because you may
>> >> >>> >>>>>>>>>>>>>>>>> want the watermark to behave differently in each 
>> >> >>> >>>>>>>>>>>>>>>>> Read and we probably don’t
>> >> >>> >>>>>>>>>>>>>>>>> support this, so it corresponds to the polymorphic 
>> >> >>> >>>>>>>>>>>>>>>>> category.
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own 
>> >> >>> >>>>>>>>>>>>>>>>> > object modelling concerns.
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> > During the implementations of 
>> >> >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered
>> >> >>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or 
>> >> >>> >>>>>>>>>>>>>>>>> > were ignored if they were set?
>> >> >>> >>>>>>>>>>>>>>>>> > If no, then the code deduplication is likely 
>> >> >>> >>>>>>>>>>>>>>>>> > worth it because we also get a
>> >> >>> >>>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this 
>> >> >>> >>>>>>>>>>>>>>>>> > an acceptable user
>> >> >>> >>>>>>>>>>>>>>>>> > experience?
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> No, not so far. This is an interesting part, notice 
>> >> >>> >>>>>>>>>>>>>>>>> that the Read translation
>> >> >>> >>>>>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part 
>> >> >>> >>>>>>>>>>>>>>>>> of ReadAll so the ReadFn is
>> >> >>> >>>>>>>>>>>>>>>>> the real read and must be aware and use all the 
>> >> >>> >>>>>>>>>>>>>>>>> parameters.
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>>     @Override
>> >> >>> >>>>>>>>>>>>>>>>>     public PCollection<SolrDocument> expand(PBegin 
>> >> >>> >>>>>>>>>>>>>>>>> input) {
>> >> >>> >>>>>>>>>>>>>>>>>       return input.apply("Create", 
>> >> >>> >>>>>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll());
>> >> >>> >>>>>>>>>>>>>>>>>     }
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> I might be missing something for the Unbounded SDF 
>> >> >>> >>>>>>>>>>>>>>>>> case which is the only case
>> >> >>> >>>>>>>>>>>>>>>>> we have not explored so far. I think one easy way 
>> >> >>> >>>>>>>>>>>>>>>>> to see the limitations would
>> >> >>> >>>>>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation 
>> >> >>> >>>>>>>>>>>>>>>>> to try to map
>> >> >>> >>>>>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra 
>> >> >>> >>>>>>>>>>>>>>>>> PCollection<Read> and the Read logic on
>> >> >>> >>>>>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints 
>> >> >>> >>>>>>>>>>>>>>>>> we hit, the polymorphic ones
>> >> >>> >>>>>>>>>>>>>>>>> will be there for sure, maybe others will appear 
>> >> >>> >>>>>>>>>>>>>>>>> (not sure). However it would be
>> >> >>> >>>>>>>>>>>>>>>>> interesting to see if we have a real gain in the 
>> >> >>> >>>>>>>>>>>>>>>>> maintenance points, but well
>> >> >>> >>>>>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of 
>> >> >>> >>>>>>>>>>>>>>>>> knobs so probably the generic
>> >> >>> >>>>>>>>>>>>>>>>> implementation could be relatively complex.
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik 
>> >> >>> >>>>>>>>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 
>> >> >>> >>>>>>>>>>>>>>>>> > work for cross language. The difference being 
>> >> >>> >>>>>>>>>>>>>>>>> > that the cross language transform would take a 
>> >> >>> >>>>>>>>>>>>>>>>> > well known definition and convert it to the Read 
>> >> >>> >>>>>>>>>>>>>>>>> > transform. A normal user would have a pipeline 
>> >> >>> >>>>>>>>>>>>>>>>> > that would look like:
>> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output>
>> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look 
>> >> >>> >>>>>>>>>>>>>>>>> > like:
>> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Read> -> PTransform(ReadAll) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output>
>> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>*
>> >> >>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to 
>> >> >>> >>>>>>>>>>>>>>>>> > SourceDescriptor) only exists since we haven't 
>> >> >>> >>>>>>>>>>>>>>>>> > solved how to use schemas with language bound 
>> >> >>> >>>>>>>>>>>>>>>>> > types in a cross language way. SchemaCoder isn't 
>> >> >>> >>>>>>>>>>>>>>>>> > portable but RowCoder is which is why the 
>> >> >>> >>>>>>>>>>>>>>>>> > conversion step exists. We could have a solution 
>> >> >>> >>>>>>>>>>>>>>>>> > for this at some point in time.
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > My concern with using Read was around:
>> >> >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the 
>> >> >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read 
>> >> >>> >>>>>>>>>>>>>>>>> > implementation allows you to set the key and 
>> >> >>> >>>>>>>>>>>>>>>>> > value deserializers which are also used to 
>> >> >>> >>>>>>>>>>>>>>>>> > dictate the output PCollection type. It also 
>> >> >>> >>>>>>>>>>>>>>>>> > allows you to set how the watermark should be 
>> >> >>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the 
>> >> >>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read 
>> >> >>> >>>>>>>>>>>>>>>>> > and they may also want an output type which is 
>> >> >>> >>>>>>>>>>>>>>>>> > polymorphic (e.g. PCollection<Serializable>).
>> >> >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own 
>> >> >>> >>>>>>>>>>>>>>>>> > object modelling concerns.
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > During the implementations of 
>> >> >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered 
>> >> >>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or 
>> >> >>> >>>>>>>>>>>>>>>>> > were ignored if they were set? If no, then the 
>> >> >>> >>>>>>>>>>>>>>>>> > code deduplication is likely worth it because we 
>> >> >>> >>>>>>>>>>>>>>>>> > also get a lot of javadoc deduplication, but if 
>> >> >>> >>>>>>>>>>>>>>>>> > yes is this an acceptable user experience?
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko 
>> >> >>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying 
>> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll as a general 
>> >> >>> >>>>>>>>>>>>>>>>> >> "PTransform<PCollection<Read>, 
>> >> >>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the 
>> >> >>> >>>>>>>>>>>>>>>>> >> amount of code duplication and error-prone 
>> >> >>> >>>>>>>>>>>>>>>>> >> approach related to this. It makes much sense 
>> >> >>> >>>>>>>>>>>>>>>>> >> since usually we have all needed configuration 
>> >> >>> >>>>>>>>>>>>>>>>> >> set in Read objects and, as Ismaeil mentioned, 
>> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll will consist mostly of only 
>> >> >>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages.  So this case usually 
>> >> >>> >>>>>>>>>>>>>>>>> >> can be unified by using PCollection<Read> as 
>> >> >>> >>>>>>>>>>>>>>>>> >> input.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use 
>> >> >>> >>>>>>>>>>>>>>>>> >> Java IOs as cross-language transforms (as Luke 
>> >> >>> >>>>>>>>>>>>>>>>> >> described) which seems only partly in common 
>> >> >>> >>>>>>>>>>>>>>>>> >> with previous pattern of ReadAll using.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept 
>> >> >>> >>>>>>>>>>>>>>>>> >> of read configuration for all needs but seems 
>> >> >>> >>>>>>>>>>>>>>>>> >> it’s not easy and I’d be more in favour with 
>> >> >>> >>>>>>>>>>>>>>>>> >> Luke and Boyuan approach with schema. Though, 
>> >> >>> >>>>>>>>>>>>>>>>> >> maybe ReadAll is not a very suitable name in 
>> >> >>> >>>>>>>>>>>>>>>>> >> this case because it will can bring some 
>> >> >>> >>>>>>>>>>>>>>>>> >> confusions related to previous pattern of 
>> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll uses.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang 
>> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go 
>> >> >>> >>>>>>>>>>>>>>>>> >> with (3) and (4): use the data type that is 
>> >> >>> >>>>>>>>>>>>>>>>> >> schema-aware as the input of ReadAll.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang 
>> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the 
>> >> >>> >>>>>>>>>>>>>>>>> >>> data type that is schema-aware as the input of 
>> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll.
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick 
>> >> >>> >>>>>>>>>>>>>>>>> >>> with SDF-like IO. But only having  (3) is not 
>> >> >>> >>>>>>>>>>>>>>>>> >>> enough to solve the problem of using ReadAll in 
>> >> >>> >>>>>>>>>>>>>>>>> >>> x-lang case.
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type 
>> >> >>> >>>>>>>>>>>>>>>>> >>> of ReadAll should be able to cross language 
>> >> >>> >>>>>>>>>>>>>>>>> >>> boundaries and have compatibilities of 
>> >> >>> >>>>>>>>>>>>>>>>> >>> updating/downgrading. After investigating some 
>> >> >>> >>>>>>>>>>>>>>>>> >>> possibilities(pure java pojo with custom coder, 
>> >> >>> >>>>>>>>>>>>>>>>> >>> protobuf, row/schema) in Kafka usage, we find 
>> >> >>> >>>>>>>>>>>>>>>>> >>> that row/schema fits our needs most. Here comes 
>> >> >>> >>>>>>>>>>>>>>>>> >>> (4). I believe that using Read as input of 
>> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also 
>> >> >>> >>>>>>>>>>>>>>>>> >>> think not all IOs have the same need. I would 
>> >> >>> >>>>>>>>>>>>>>>>> >>> treat Read as a special type as long as the 
>> >> >>> >>>>>>>>>>>>>>>>> >>> Read is schema-aware.
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara 
>> >> >>> >>>>>>>>>>>>>>>>> >>> Jayalath <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> options discussed so far when it comes to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> defining source descriptors for ReadAll type 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> the input PCollection
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> the data element of the input PCollection
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Read transform which essentially will convert 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> it to a ReadAll (what Eugene mentioned)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> set of source descriptions such as files.
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> will make it hard to use the ReadAll transform 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> as a cross-language transform and will break 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> the separation of construction time and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> runtime constructs
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> careful but will make the transform easier to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> be used as a cross-language transform without 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> additional modifications
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> might be able to just define all sources in 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> that format and make Read transforms just an 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> easy to use composite built on top of that (by 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> adding a preceding Create transform).
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Thanks,
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Cham
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> Jayalath <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> to a transform, at least here: 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> time transforms from execution time data 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> objects that we store in PCollections as 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Luke mentioned. Also, we don't guarantee 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> that PTransform is serializable so users 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> have the additional complexity of providing 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> a corder whenever a PTransform is used as a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> data object.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Java objects that are convertible to Beam 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Rows allow us to make these transforms 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> available to other SDKs through the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> cross-language transforms. Using transforms 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> or complex sources as data objects will 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Thanks,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Cham
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Zhang <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to the IO with SDF implementation 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> despite the type of input, where Read 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to UnboundedSource.  One major 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> pushback of using KafkaIO.Read as source 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description is that not all configurations 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> during execution time. Also when thinking 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> about x-lang useage, making source 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description across language boundaries is 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> also necessary.  As Luke mentioned, it's 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> quite easy to infer a Schema from an 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> KafkaSourceDescription.java. Then the coder 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of this schema-aware object will be a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> SchemaCoder. When crossing language 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> into the source description: 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Convert.fromRows.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contains the configurable parameters for 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> reading from Kafka. This is different from 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the pattern that Ismael listed because 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> they take PCollection<Read> as input and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the Read is the same as the Read 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PTransform class used for the non read all 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> case.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> configure the transform have to be copied 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> over to the source descriptor but 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> decouples how a transform is specified 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> from the object that describes what needs 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> to be done. I believe Ismael's point is 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and I believe is a non-issue is that the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Java SDK has the most IO connectors 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and we would want to use the IO 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> implementations within Beam Go and Beam 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Python. This brings in its own set of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> compatibility for the wire format and how 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> one parameterizes such transforms. The 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> wire format issue can be solved with 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> either approach by making sure that the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> cross language expansion always takes the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Read/KafkaSourceDescriptor/... object that 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> is then passed to the ReadAll transform. 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Boyuan has been looking to make the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> can be represented as a row and this can 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> be done easily using the AutoValue 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> integration (I don't believe there is 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> anything preventing someone from writing a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> schema row -> Read -> row adapter or also 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue configuration if the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue).
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and separation of concerns provided by 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using a different object to represent the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contents of the PCollection from the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> pipeline construction time PTransform.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Kirpichov <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> considered an approach similar (or dual) 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> to FileIO.write(), where we in a sense 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> also have to configure a dynamic number 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> different IO transforms of the same type 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> (file writes)?
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> many aspects of many file writes:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>  Transaction>writeDynamic()
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> tx.getType().toFields(tx),  // Convert 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> the data to be written to CSVSink
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>           type -> new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames()))
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> defaultNaming(type + "-transactions", 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ".csv"));
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> JdbcIO reads:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type from which all the read parameters 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> can be inferred
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> bars.apply(JdbcIO.<Bar, Moo>readAll()
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> this bar...)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo(...))
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> size for this bar...)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Mejía <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs to DoFn based ones. One pattern
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn. The idea is to have a different
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> like doing multiple queries in the same
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> or querying from multiple tables at the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll transforms the parameters for
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> with lots of duplicated with methods and
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms into the ReadAll transforms.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> have to expand the input parameters of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> something that resembles the full `Read`
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> want to read from multiple tables or
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but this was not in the intermediate
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> extra methods (duplicating more code)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read full spec.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read method we end up adding them
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transform too so they are taken into 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> account.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> change to test a new approach that is
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The code became:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of improvements on parameters of normal
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read parameters. But of course there are
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into normal Reads for example
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> reusable between IOs (e.g. the    non-SDF
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument> 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) {
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn()))
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey())
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn()));
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     }
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   }
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the results ReadAll you must have the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> require consistent types from the data
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> need to add extra withCoder method(s) on
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> do so. So I wanted to bring this subject
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> opinions, and if you see any sort of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues that
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consensus to start using consistently the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> on Read and the readAll() method for new
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this in the only remaining inconsistent
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but apart of this we should be ok).
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> called ReadAll and maybe it is worth to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> be
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik 
>> >> >>> >>>>>>>>>>>>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 
>> >> >>> >>>>>>>>>>>>>>>>> > work for cross language. The difference being 
>> >> >>> >>>>>>>>>>>>>>>>> > that the cross language transform would take a 
>> >> >>> >>>>>>>>>>>>>>>>> > well known definition and convert it to the Read 
>> >> >>> >>>>>>>>>>>>>>>>> > transform. A normal user would have a pipeline 
>> >> >>> >>>>>>>>>>>>>>>>> > that would look like:
>> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output>
>> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look 
>> >> >>> >>>>>>>>>>>>>>>>> > like:
>> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Read> -> PTransform(ReadAll) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output>
>> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> 
>> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>*
>> >> >>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to 
>> >> >>> >>>>>>>>>>>>>>>>> > SourceDescriptor) only exists since we haven't 
>> >> >>> >>>>>>>>>>>>>>>>> > solved how to use schemas with language bound 
>> >> >>> >>>>>>>>>>>>>>>>> > types in a cross language way. SchemaCoder isn't 
>> >> >>> >>>>>>>>>>>>>>>>> > portable but RowCoder is which is why the 
>> >> >>> >>>>>>>>>>>>>>>>> > conversion step exists. We could have a solution 
>> >> >>> >>>>>>>>>>>>>>>>> > for this at some point in time.
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > My concern with using Read was around:
>> >> >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the 
>> >> >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read 
>> >> >>> >>>>>>>>>>>>>>>>> > implementation allows you to set the key and 
>> >> >>> >>>>>>>>>>>>>>>>> > value deserializers which are also used to 
>> >> >>> >>>>>>>>>>>>>>>>> > dictate the output PCollection type. It also 
>> >> >>> >>>>>>>>>>>>>>>>> > allows you to set how the watermark should be 
>> >> >>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the 
>> >> >>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read 
>> >> >>> >>>>>>>>>>>>>>>>> > and they may also want an output type which is 
>> >> >>> >>>>>>>>>>>>>>>>> > polymorphic (e.g. PCollection<Serializable>).
>> >> >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own 
>> >> >>> >>>>>>>>>>>>>>>>> > object modelling concerns.
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > During the implementations of 
>> >> >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered 
>> >> >>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or 
>> >> >>> >>>>>>>>>>>>>>>>> > were ignored if they were set? If no, then the 
>> >> >>> >>>>>>>>>>>>>>>>> > code deduplication is likely worth it because we 
>> >> >>> >>>>>>>>>>>>>>>>> > also get a lot of javadoc deduplication, but if 
>> >> >>> >>>>>>>>>>>>>>>>> > yes is this an acceptable user experience?
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> >
>> >> >>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko 
>> >> >>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying 
>> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll as a general 
>> >> >>> >>>>>>>>>>>>>>>>> >> "PTransform<PCollection<Read>, 
>> >> >>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the 
>> >> >>> >>>>>>>>>>>>>>>>> >> amount of code duplication and error-prone 
>> >> >>> >>>>>>>>>>>>>>>>> >> approach related to this. It makes much sense 
>> >> >>> >>>>>>>>>>>>>>>>> >> since usually we have all needed configuration 
>> >> >>> >>>>>>>>>>>>>>>>> >> set in Read objects and, as Ismaeil mentioned, 
>> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll will consist mostly of only 
>> >> >>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages.  So this case usually 
>> >> >>> >>>>>>>>>>>>>>>>> >> can be unified by using PCollection<Read> as 
>> >> >>> >>>>>>>>>>>>>>>>> >> input.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use 
>> >> >>> >>>>>>>>>>>>>>>>> >> Java IOs as cross-language transforms (as Luke 
>> >> >>> >>>>>>>>>>>>>>>>> >> described) which seems only partly in common 
>> >> >>> >>>>>>>>>>>>>>>>> >> with previous pattern of ReadAll using.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept 
>> >> >>> >>>>>>>>>>>>>>>>> >> of read configuration for all needs but seems 
>> >> >>> >>>>>>>>>>>>>>>>> >> it’s not easy and I’d be more in favour with 
>> >> >>> >>>>>>>>>>>>>>>>> >> Luke and Boyuan approach with schema. Though, 
>> >> >>> >>>>>>>>>>>>>>>>> >> maybe ReadAll is not a very suitable name in 
>> >> >>> >>>>>>>>>>>>>>>>> >> this case because it will can bring some 
>> >> >>> >>>>>>>>>>>>>>>>> >> confusions related to previous pattern of 
>> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll uses.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang 
>> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go 
>> >> >>> >>>>>>>>>>>>>>>>> >> with (3) and (4): use the data type that is 
>> >> >>> >>>>>>>>>>>>>>>>> >> schema-aware as the input of ReadAll.
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang 
>> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the 
>> >> >>> >>>>>>>>>>>>>>>>> >>> data type that is schema-aware as the input of 
>> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll.
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick 
>> >> >>> >>>>>>>>>>>>>>>>> >>> with SDF-like IO. But only having  (3) is not 
>> >> >>> >>>>>>>>>>>>>>>>> >>> enough to solve the problem of using ReadAll in 
>> >> >>> >>>>>>>>>>>>>>>>> >>> x-lang case.
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type 
>> >> >>> >>>>>>>>>>>>>>>>> >>> of ReadAll should be able to cross language 
>> >> >>> >>>>>>>>>>>>>>>>> >>> boundaries and have compatibilities of 
>> >> >>> >>>>>>>>>>>>>>>>> >>> updating/downgrading. After investigating some 
>> >> >>> >>>>>>>>>>>>>>>>> >>> possibilities(pure java pojo with custom coder, 
>> >> >>> >>>>>>>>>>>>>>>>> >>> protobuf, row/schema) in Kafka usage, we find 
>> >> >>> >>>>>>>>>>>>>>>>> >>> that row/schema fits our needs most. Here comes 
>> >> >>> >>>>>>>>>>>>>>>>> >>> (4). I believe that using Read as input of 
>> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also 
>> >> >>> >>>>>>>>>>>>>>>>> >>> think not all IOs have the same need. I would 
>> >> >>> >>>>>>>>>>>>>>>>> >>> treat Read as a special type as long as the 
>> >> >>> >>>>>>>>>>>>>>>>> >>> Read is schema-aware.
>> >> >>> >>>>>>>>>>>>>>>>> >>>
>> >> >>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara 
>> >> >>> >>>>>>>>>>>>>>>>> >>> Jayalath <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> options discussed so far when it comes to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> defining source descriptors for ReadAll type 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> the input PCollection
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> the data element of the input PCollection
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Read transform which essentially will convert 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> it to a ReadAll (what Eugene mentioned)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> set of source descriptions such as files.
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> will make it hard to use the ReadAll transform 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> as a cross-language transform and will break 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> the separation of construction time and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> runtime constructs
>> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> careful but will make the transform easier to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> be used as a cross-language transform without 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> additional modifications
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> might be able to just define all sources in 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> that format and make Read transforms just an 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> easy to use composite built on top of that (by 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> adding a preceding Create transform).
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Thanks,
>> >> >>> >>>>>>>>>>>>>>>>> >>>> Cham
>> >> >>> >>>>>>>>>>>>>>>>> >>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik 
>> >> >>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>> Jayalath <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> to a transform, at least here: 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> time transforms from execution time data 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> objects that we store in PCollections as 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Luke mentioned. Also, we don't guarantee 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> that PTransform is serializable so users 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> have the additional complexity of providing 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> a corder whenever a PTransform is used as a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> data object.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Java objects that are convertible to Beam 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Rows allow us to make these transforms 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> available to other SDKs through the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> cross-language transforms. Using transforms 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> or complex sources as data objects will 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Thanks,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Cham
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Zhang <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to the IO with SDF implementation 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> despite the type of input, where Read 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to UnboundedSource.  One major 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> pushback of using KafkaIO.Read as source 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description is that not all configurations 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> during execution time. Also when thinking 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> about x-lang useage, making source 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description across language boundaries is 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> also necessary.  As Luke mentioned, it's 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> quite easy to infer a Schema from an 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> KafkaSourceDescription.java. Then the coder 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of this schema-aware object will be a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> SchemaCoder. When crossing language 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> into the source description: 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Convert.fromRows.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contains the configurable parameters for 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> reading from Kafka. This is different from 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the pattern that Ismael listed because 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> they take PCollection<Read> as input and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the Read is the same as the Read 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PTransform class used for the non read all 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> case.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> configure the transform have to be copied 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> over to the source descriptor but 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> decouples how a transform is specified 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> from the object that describes what needs 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> to be done. I believe Ismael's point is 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and I believe is a non-issue is that the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Java SDK has the most IO connectors 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and we would want to use the IO 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> implementations within Beam Go and Beam 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Python. This brings in its own set of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> compatibility for the wire format and how 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> one parameterizes such transforms. The 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> wire format issue can be solved with 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> either approach by making sure that the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> cross language expansion always takes the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Read/KafkaSourceDescriptor/... object that 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> is then passed to the ReadAll transform. 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Boyuan has been looking to make the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> can be represented as a row and this can 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> be done easily using the AutoValue 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> integration (I don't believe there is 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> anything preventing someone from writing a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> schema row -> Read -> row adapter or also 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue configuration if the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue).
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and separation of concerns provided by 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using a different object to represent the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contents of the PCollection from the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> pipeline construction time PTransform.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Kirpichov <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> considered an approach similar (or dual) 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> to FileIO.write(), where we in a sense 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> also have to configure a dynamic number 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> different IO transforms of the same type 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> (file writes)?
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> many aspects of many file writes:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>  Transaction>writeDynamic()
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> tx.getType().toFields(tx),  // Convert 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> the data to be written to CSVSink
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>           type -> new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames()))
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> defaultNaming(type + "-transactions", 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ".csv"));
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> JdbcIO reads:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type from which all the read parameters 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> can be inferred
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> bars.apply(JdbcIO.<Bar, Moo>readAll()
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> this bar...)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo(...))
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> size for this bar...)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Mejía <[email protected]> wrote:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs to DoFn based ones. One pattern
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn. The idea is to have a different
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> like doing multiple queries in the same
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> or querying from multiple tables at the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll transforms the parameters for
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> with lots of duplicated with methods and
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms into the ReadAll transforms.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> have to expand the input parameters of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> something that resembles the full `Read`
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> want to read from multiple tables or
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but this was not in the intermediate
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> extra methods (duplicating more code)
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read full spec.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read method we end up adding them
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transform too so they are taken into 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> account.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> change to test a new approach that is
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The code became:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of improvements on parameters of normal
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read parameters. But of course there are
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into normal Reads for example
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> reusable between IOs (e.g. the    non-SDF
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument> 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) {
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn()))
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey())
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn()));
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     }
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   }
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the results ReadAll you must have the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> require consistent types from the data
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> need to add extra withCoder method(s) on
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> do so. So I wanted to bring this subject
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> opinions, and if you see any sort of 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues that
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consensus to start using consistently the
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> on Read and the readAll() method for new
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this in the only remaining inconsistent
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but apart of this we should be ok).
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> called ReadAll and maybe it is worth to 
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> be
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>> >>
>> >> >>> >>>>>>>>>>>>>>>>>

Reply via email to