OK, I'm +0 on this change. Using the PTransform as an element is
probably better than duplicating the full API on another interface,
and think it's worth getting this ublocked. This will require a Read2
if we have to add options in a upgrade-compatible way.

On Tue, Jul 7, 2020 at 3:19 PM Luke Cwik <[email protected]> wrote:
>
> Robert, you're correct in your understanding that the Read PTransform would 
> be encoded via the schema coder.
>
> Kenn, different serializers are ok as long as the output coder can 
> encode/decode the output type. Different watermark fns are also ok since it 
> is about computing the watermark for each individual source and won't impact 
> the watermark computed by other sources. Watermark advancement will still be 
> held back by the source that is furthest behind and still has the same 
> problems when a user chooses a watermark fn that was incompatible with the 
> windowing strategy for producing output (e.g. global window + default trigger 
> + streaming pipeline).
>
> Both are pretty close so if we started from scratch then it could go either 
> way but we aren't starting from scratch (I don't think a Beam 3.0 is likely 
> to happen in the next few years as there isn't enough stuff that we want to 
> remove vs the amount of stuff we would gain).
>
> On Tue, Jul 7, 2020 at 2:57 PM Kenneth Knowles <[email protected]> wrote:
>>
>> On Tue, Jul 7, 2020 at 2:24 PM Robert Bradshaw <[email protected]> wrote:
>>>
>>> On Tue, Jul 7, 2020 at 2:06 PM Luke Cwik <[email protected]> wrote:
>>> >
>>> > Robert, the intent is that the Read object would use a schema coder and 
>>> > for XLang purposes would be no different then a POJO.
>>>
>>> Just to clarify, you're saying that the Read PTransform would be
>>> encoded via the schema coder? That still feels a bit odd (and
>>> specificically if we were designing IO from scratch rather than
>>> adapting to what already exists would we choose to use PTransforms as
>>> elements?) but would solve the cross language issue.
>>
>>
>> I like this question. If we were designing from scratch, what would we do? 
>> Would we encourage users to feed Create.of(SourceDescriptor) into ReadAll? 
>> We would probably provide a friendly wrapper for reading one static thing, 
>> and call it Read. But it would probably have an API like 
>> Read.from(SourceDescriptor), thus eliminating duplicate documentation and 
>> boilerplate that Luke described while keeping the separation that Brian 
>> described and clarity around xlang environments. But I'm +0 on whatever has 
>> momentum. I think the main downside is the weirdness around 
>> serializers/watermarkFn/etc on Read. I am not sure how much this will cause 
>> users problems. It would be very ambitious of them to produce a 
>> PCollection<Read> where they had different fns per element...
>>
>> Kenn
>>
>>>
>>> > The issue of how to deal with closures applies to both equally and that 
>>> > is why I suggested to favor using data over closures. Once there is an 
>>> > implementation for how to deal with UDFs in an XLang world, this guidance 
>>> > can change.
>>> >
>>> > Kenn, I did mean specifying an enum that the XLang expansion service 
>>> > would return a serialized blob of code. The XLang expansion service is 
>>> > responsible for returning an environment that contains all the necessary 
>>> > dependencies to execute the transforms and the serialized blob of code 
>>> > and hence would be a non-issue for the caller.
>>> >
>>> > From reviewing the SDF Kafka PR, the reduction in maintenance is 
>>> > definitely there (100s of lines of duplicated boilerplate and 
>>> > documentation).
>>> >
>>> > What are the next steps to get a resolution on this?
>>> >
>>> > On Thu, Jul 2, 2020 at 10:38 AM Robert Bradshaw <[email protected]> 
>>> > wrote:
>>> >>
>>> >> On Thu, Jul 2, 2020 at 10:26 AM Kenneth Knowles <[email protected]> wrote:
>>> >>>
>>> >>>
>>> >>> On Wed, Jul 1, 2020 at 4:17 PM Eugene Kirpichov <[email protected]> 
>>> >>> wrote:
>>> >>>>
>>> >>>> Kenn - I don't mean an enum of common closures, I mean expressing 
>>> >>>> closures in a restricted sub-language such as the language of SQL 
>>> >>>> expressions.
>>> >>>
>>> >>>
>>> >>> My lack of clarity: enums was my phrasing of Luke's item 1). I 
>>> >>> understood what you meant. I think either a set of well-known closures 
>>> >>> or a tiny sublanguage could add value.
>>> >>>
>>> >>>>
>>> >>>> That would only work if there is a portable way to interpret SQL 
>>> >>>> expressions, but if there isn't, maybe there should be - for the sake 
>>> >>>> of, well, expressing closures portably. Of course these would be 
>>> >>>> closures that only work with rows - but that seems powerful enough for 
>>> >>>> many if not most purposes.
>>> >>>
>>> >>>
>>> >>> You can choose a SQL dialect or choose the tiniest subset just for this 
>>> >>> purpose and go with it. But when the data type going in or out of the 
>>> >>> lambda are e.g. some Java or Python object then what? One idea is to 
>>> >>> always require these to be rows. But if you can really get away with a 
>>> >>> dependency-free context-free lambda, then Javascript or Python is as 
>>> >>> doable as SQL in terms of having a tiny restricted language for just 
>>> >>> this purpose. I would expect once it got used, folks would start to ask 
>>> >>> to include the rest of what the language has to offer - its ecosystem. 
>>> >>> This is always the main design point I am interested in for 
>>> >>> "lightweight" embedded UDF proposals.
>>> >>
>>> >>
>>> >> This is getting off the topic of ReadAll, but I think being able to do 
>>> >> arbitrary computation in preceding/succeeding transform plus a (quite) 
>>> >> restricted language in the transform itself can go a long way. (For 
>>> >> example, one could have a dynamic destinations write that takes a 
>>> >> KV<element, dest> where dest is a format string like 
>>> >> "foo-{{shard_num}}.txt" to plug in the truly dynamic pieces, but the 
>>> >> dest string itself can be computed (e.g. based on the element) using 
>>> >> arbitrary code in the caller language.)
>>> >>
>>> >>>
>>> >>>
>>> >>> Kenn
>>> >>>
>>> >>>>
>>> >>>> For example, maybe the Java example:
>>> >>>>
>>> >>>>  PCollection<BankTransaction> transactions = ...;
>>> >>>>  transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>> >>>>      .by(Transaction::getType)
>>> >>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be 
>>> >>>> written to CSVSink
>>> >>>>           type -> new CSVSink(type.getFieldNames()))
>>> >>>>      .to(".../path/to/")
>>> >>>>      .withNaming(type -> defaultNaming(type + "-transactions", 
>>> >>>> ".csv"));
>>> >>>>
>>> >>>> could be written in Python as:
>>> >>>>
>>> >>>> transactions | fileio.write_dynamic(
>>> >>>>   by="it.type",  # "it" is implicitly available in these SQL 
>>> >>>> expressions as the same thing as the Java lambda argument
>>> >>>>   format="it.fields",
>>> >>>>   sink="CSV_SINK(it.type.field_names)",  # A bunch of preset sinks 
>>> >>>> supported in every language?
>>> >>>>   to=".../path/to/",
>>> >>>>   naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')")
>>> >>>>
>>> >>>> Again, to be clear, I'm not suggesting to block what Ismael is 
>>> >>>> proposing on getting this done - getting this done wouldn't be a short 
>>> >>>> term effort, but seems potentially really nice.
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <[email protected]> wrote:
>>> >>>>>
>>> >>>>> From the Go side of the table, the Go language doesn't provide a 
>>> >>>>> mechanism to serialize or access closure data, which means DoFns 
>>> >>>>> can't be functional closures.This combined with the move to have the 
>>> >>>>> "Structural DoFns" be serialized using Beam Schemas, has the net 
>>> >>>>> result that if Go transforms are used for Cross Language, they will 
>>> >>>>> be configurable with a Schema of the configuration data.
>>> >>>>>
>>> >>>>> Of course, this just means that each language will probably provide 
>>> >>>>> whichever mechanisms it likes for use of it's cross language 
>>> >>>>> transforms.
>>> >>>>>
>>> >>>>> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <[email protected]> wrote:
>>> >>>>>>
>>> >>>>>> I don't think an enum of most common closures will work. The input 
>>> >>>>>> types are typically generics that are made concrete by the caller 
>>> >>>>>> who also provides the closures. I think Luke's (2) is the same idea 
>>> >>>>>> as my "Java still assembles it [using opaque Python 
>>> >>>>>> closures/transforms]". It seems like an approach to (3). Passing 
>>> >>>>>> over actual code could address some cases, but libraries become the 
>>> >>>>>> issue.
>>> >>>>>>
>>> >>>>>> I think it is fair to say that "WriteAll" style would involve 
>>> >>>>>> entering unexplored territory.
>>> >>>>>>
>>> >>>>>> On the main topic, I think Brian has a pretty strong point and his 
>>> >>>>>> example of type conversion lambdas is a good example. I did a quick 
>>> >>>>>> survey and every other property I could find does seem like it fits 
>>> >>>>>> on the Read, and most IOs have a few of these closures for example 
>>> >>>>>> also extracting timestamps. So maybe just a resolution convention of 
>>> >>>>>> putting them on the ReadAll and that taking precedence. Then you 
>>> >>>>>> would be deserializing a Read transform with insta-crash methods or 
>>> >>>>>> some such?
>>> >>>>>>
>>> >>>>>> Kenn
>>> >>>>>>
>>> >>>>>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov 
>>> >>>>>> <[email protected]> wrote:
>>> >>>>>>>
>>> >>>>>>> Yeah, mainly I just feel like dynamic reads and dynamic writes (and 
>>> >>>>>>> perhaps not-yet-invented similar transforms of other kinds) are 
>>> >>>>>>> tightly related - they are either very similar, or are duals of 
>>> >>>>>>> each other - so they should use the same approach. If they are 
>>> >>>>>>> using different approaches, it is a sign that either one of them is 
>>> >>>>>>> being done wrong or that we are running into a fundamental 
>>> >>>>>>> limitation of Beam (e.g. difficulty of encoding closures compared 
>>> >>>>>>> to encoding elements).
>>> >>>>>>>
>>> >>>>>>> But I agree with Luke that we shouldn't give up on closures. 
>>> >>>>>>> Especially with the work that has been done on schemas and SQL, I 
>>> >>>>>>> see no reason why we couldn't express closures in a portable 
>>> >>>>>>> restricted sub-language. If we can express SQL, we can express many 
>>> >>>>>>> or most use cases of dynamic reads/writes - I don't mean that we 
>>> >>>>>>> should actually use SQL (though we could - e.g. SQL scalar 
>>> >>>>>>> expressions seem powerful enough to express the closures appearing 
>>> >>>>>>> in most use cases of FileIO.writeDynamic), I just mean that SQL is 
>>> >>>>>>> an existence proof.
>>> >>>>>>>
>>> >>>>>>> (I don't want to rock the boat too much, just thought I'd chime in 
>>> >>>>>>> as this topic is dear to my heart)
>>> >>>>>>>
>>> >>>>>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> wrote:
>>> >>>>>>>>
>>> >>>>>>>> Kenn, I'm not too worried about closures since:
>>> >>>>>>>> 1) the expansion service for a transform could have a well set of 
>>> >>>>>>>> defined closures by name that are returned as serialized objects 
>>> >>>>>>>> that don't need to be interpretable by the caller
>>> >>>>>>>> 2) the language could store serialized functions of another 
>>> >>>>>>>> language as constants
>>> >>>>>>>> 3) generic XLang function support will eventually be needed
>>> >>>>>>>> but I do agree that closures do make things difficult to express 
>>> >>>>>>>> vs data which is why primarily why we should prefer data over 
>>> >>>>>>>> closures when possible and use closures when expressing it with 
>>> >>>>>>>> data would be too cumbersome.
>>> >>>>>>>>
>>> >>>>>>>> Brian, so far the cases that have been migrated have shown that 
>>> >>>>>>>> the source descriptor and the Read transform are almost the same 
>>> >>>>>>>> (some parameters that only impact pipeline construction such as 
>>> >>>>>>>> coders differ).
>>> >>>>>>>>
>>> >>>>>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette 
>>> >>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>> Sorry for jumping into this late and casting a vote against the 
>>> >>>>>>>>> consensus... but I think I'd prefer standardizing on a pattern 
>>> >>>>>>>>> like PCollection<KafkaSourceDescriptor> rather than 
>>> >>>>>>>>> PCollection<Read>. That approach clearly separates the parameters 
>>> >>>>>>>>> that are allowed to vary across a ReadAll (the ones defined in 
>>> >>>>>>>>> KafkaSourceDescriptor) from the parameters that should be 
>>> >>>>>>>>> constant (other parameters in the Read object, like 
>>> >>>>>>>>> SerializedFunctions for type conversions, parameters for 
>>> >>>>>>>>> different operating modes, etc...). I think it's helpful to think 
>>> >>>>>>>>> of the parameters that are allowed to vary as some "location 
>>> >>>>>>>>> descriptor", but I imagine IO authors may want other parameters 
>>> >>>>>>>>> to vary across a ReadAll as well.
>>> >>>>>>>>>
>>> >>>>>>>>> To me it seems safer to let an IO author "opt-in" to a parameter 
>>> >>>>>>>>> being dynamic at execution time.
>>> >>>>>>>>>
>>> >>>>>>>>> Brian
>>> >>>>>>>>>
>>> >>>>>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov 
>>> >>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> I'd like to raise one more time the question of consistency 
>>> >>>>>>>>>> between dynamic reads and dynamic writes, per my email at the 
>>> >>>>>>>>>> beginning of the thread.
>>> >>>>>>>>>> If the community prefers ReadAll to read from Read, then should 
>>> >>>>>>>>>> dynamicWrite's write to Write?
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang 
>>> >>>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> It seems like most of us agree on the idea that ReadAll should 
>>> >>>>>>>>>>> read from Read. I'm going to update the Kafka ReadAll with the 
>>> >>>>>>>>>>> same pattern.
>>> >>>>>>>>>>> Thanks for all your help!
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath 
>>> >>>>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]> 
>>> >>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> I would also like to suggest that transforms that implement 
>>> >>>>>>>>>>>>> ReadAll via Read should also provide methods like:
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> // Uses the specified values if unspecified in the input 
>>> >>>>>>>>>>>>> element from the PCollection<Read>.
>>> >>>>>>>>>>>>> withDefaults(Read read);
>>> >>>>>>>>>>>>> // Uses the specified values regardless of what the input 
>>> >>>>>>>>>>>>> element from the PCollection<Read> specifies.
>>> >>>>>>>>>>>>> withOverrides(Read read);
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> and only adds methods that are required at construction time 
>>> >>>>>>>>>>>>> (e.g. coders). This way the majority of documentation sits on 
>>> >>>>>>>>>>>>> the Read transform.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here 
>>> >>>>>>>>>>>> and some of the drawbacks related to cross-language can be 
>>> >>>>>>>>>>>> overcome through future advancements.
>>> >>>>>>>>>>>> Thanks for bringing this up Ismaël.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> - Cham
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> 
>>> >>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Ismael, it is good to hear that using Read as the input 
>>> >>>>>>>>>>>>>> didn't have a bunch of parameters that were being 
>>> >>>>>>>>>>>>>> skipped/ignored. Also, for the polymorphism issue you have 
>>> >>>>>>>>>>>>>> to rely on the user correctly telling you the type in such a 
>>> >>>>>>>>>>>>>> way where it is a common ancestor of all the runtime types 
>>> >>>>>>>>>>>>>> that will ever be used. This usually boils down to something 
>>> >>>>>>>>>>>>>> like Serializable or DynamicMessage such that the coder that 
>>> >>>>>>>>>>>>>> is chosen works for all the runtime types. Using multiple 
>>> >>>>>>>>>>>>>> types is a valid use case and would allow for a simpler 
>>> >>>>>>>>>>>>>> graph with less flattens merging the output from multiple 
>>> >>>>>>>>>>>>>> sources.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for 
>>> >>>>>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the 
>>> >>>>>>>>>>>>>> parameters can't be represented in a meaningful way beyond 
>>> >>>>>>>>>>>>>> "bytes". This would be helpful for cross language as well 
>>> >>>>>>>>>>>>>> since every parameter would become available if a language 
>>> >>>>>>>>>>>>>> could support it (e.g. it could serialize a java function up 
>>> >>>>>>>>>>>>>> front and keep it saved as raw bytes within said language). 
>>> >>>>>>>>>>>>>> Even if we figure out a better way to do this in the future, 
>>> >>>>>>>>>>>>>> we'll have to change the schema for the new way anyway. This 
>>> >>>>>>>>>>>>>> would mean that the external version of the transform adopts 
>>> >>>>>>>>>>>>>> Row to Read and we drop KafkaSourceDescriptor. The 
>>> >>>>>>>>>>>>>> conversion from Row to Read could validate that the 
>>> >>>>>>>>>>>>>> parameters make sense (e.g. the bytes are valid serialized 
>>> >>>>>>>>>>>>>> functions). The addition of an endReadTime/endReadOffset 
>>> >>>>>>>>>>>>>> would make sense for KafkaIO.Read as well and this would 
>>> >>>>>>>>>>>>>> enable having a bounded version that could be used for 
>>> >>>>>>>>>>>>>> backfills (this doesn't have to be done as part of any 
>>> >>>>>>>>>>>>>> current ongoing PR). Essentially any parameter that could be 
>>> >>>>>>>>>>>>>> added for a single instance of a Kafka element+restriction 
>>> >>>>>>>>>>>>>> would also make sense to the KafkaIO.Read transform since it 
>>> >>>>>>>>>>>>>> too is a single instance. There are parameters that would 
>>> >>>>>>>>>>>>>> apply to the ReadAll that wouldn't apply to a read and these 
>>> >>>>>>>>>>>>>> would be global parameters across all element+restriction 
>>> >>>>>>>>>>>>>> pairs such as config overrides or default values.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> I am convinced that we should do as Ismael is suggesting and 
>>> >>>>>>>>>>>>>> use KafkaIO.Read as the type.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath 
>>> >>>>>>>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Discussion regarding cross-language transforms is a slight 
>>> >>>>>>>>>>>>>>> tangent here. But I think, in general, it's great if we can 
>>> >>>>>>>>>>>>>>> use existing transforms (for example, IO connectors) as 
>>> >>>>>>>>>>>>>>> cross-language transforms without having to build more 
>>> >>>>>>>>>>>>>>> composites (irrespective of whether in 
>>> >>>>>>>>>>>>>>> ExternalTransformBuilders or a user pipelines) just to make 
>>> >>>>>>>>>>>>>>> them cross-language compatible. A future cross-language 
>>> >>>>>>>>>>>>>>> compatible SchemaCoder might help (assuming that works for 
>>> >>>>>>>>>>>>>>> Read transform) but I'm not sure we have a good idea when 
>>> >>>>>>>>>>>>>>> we'll get to that state.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Thanks,
>>> >>>>>>>>>>>>>>> Cham
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang 
>>> >>>>>>>>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the 
>>> >>>>>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline update 
>>> >>>>>>>>>>>>>>>> scenario(For detailed discussion, please refer to 
>>> >>>>>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>> >>>>>>>>>>>>>>>>  In order to obtain the compatibility, it requires the 
>>> >>>>>>>>>>>>>>>> input of the read SDF is schema-aware.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor 
>>> >>>>>>>>>>>>>>>> to PCollection<Read> is, the KafkaIO.Read also needs to be 
>>> >>>>>>>>>>>>>>>> schema-aware, otherwise pipeline updates might fail 
>>> >>>>>>>>>>>>>>>> unnecessarily. If looking into KafkaIO.Read, not all 
>>> >>>>>>>>>>>>>>>> necessary fields are compatible with schema, for example, 
>>> >>>>>>>>>>>>>>>> SerializedFunction.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a 
>>> >>>>>>>>>>>>>>>> common pattern for SDF based IO. The Read can be a common 
>>> >>>>>>>>>>>>>>>> pattern because the input is always a PBegin. But for an 
>>> >>>>>>>>>>>>>>>> SDF based IO, the input can be anything. By using Read as 
>>> >>>>>>>>>>>>>>>> input, we will still have the maintenance cost when SDF IO 
>>> >>>>>>>>>>>>>>>> supports a new field but Read doesn't consume it. For 
>>> >>>>>>>>>>>>>>>> example, we are discussing adding endOffset and 
>>> >>>>>>>>>>>>>>>> endReadTime to KafkaSourceDescriptior, which is not used 
>>> >>>>>>>>>>>>>>>> in KafkaIO.Read.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía 
>>> >>>>>>>>>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> We forgot to mention (5) External.Config used in 
>>> >>>>>>>>>>>>>>>>> cross-lang, see KafkaIO
>>> >>>>>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the 
>>> >>>>>>>>>>>>>>>>> predecessor of (4) and probably a
>>> >>>>>>>>>>>>>>>>> really good candidate to be replaced by the Row based 
>>> >>>>>>>>>>>>>>>>> Configuration Boyuan is
>>> >>>>>>>>>>>>>>>>> envisioning (so good to be aware of this).
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention the 
>>> >>>>>>>>>>>>>>>>> real issue(s). All the
>>> >>>>>>>>>>>>>>>>> approaches discussed so far in the end could be easily 
>>> >>>>>>>>>>>>>>>>> transformed to produce a
>>> >>>>>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be read 
>>> >>>>>>>>>>>>>>>>> by the generic ReadAll
>>> >>>>>>>>>>>>>>>>> transform. Notice that this can be internal in some IOs 
>>> >>>>>>>>>>>>>>>>> e.g. KafkaIO if they
>>> >>>>>>>>>>>>>>>>> decide not to expose it. I am not saying that we should 
>>> >>>>>>>>>>>>>>>>> force every IO to
>>> >>>>>>>>>>>>>>>>> support ReadAll in its public API but if we do it is 
>>> >>>>>>>>>>>>>>>>> probably a good idea to be
>>> >>>>>>>>>>>>>>>>> consistent with naming the transform that expects an 
>>> >>>>>>>>>>>>>>>>> input PCollection<Read> in
>>> >>>>>>>>>>>>>>>>> the same way. Also notice that using it will save us of 
>>> >>>>>>>>>>>>>>>>> the maintenance issues
>>> >>>>>>>>>>>>>>>>> discussed in my previous email.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Back to the main concern: the consequences of expansion 
>>> >>>>>>>>>>>>>>>>> based on Read: So far I
>>> >>>>>>>>>>>>>>>>> have not seen consequences for the Splitting part which 
>>> >>>>>>>>>>>>>>>>> maps really nice
>>> >>>>>>>>>>>>>>>>> assuming the Partition info / Restriction is available as 
>>> >>>>>>>>>>>>>>>>> part of Read. So far
>>> >>>>>>>>>>>>>>>>> there are not Serialization because Beam is already 
>>> >>>>>>>>>>>>>>>>> enforcing this. Notice that
>>> >>>>>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man 
>>> >>>>>>>>>>>>>>>>> SDF at least for the
>>> >>>>>>>>>>>>>>>>> Bounded case (see the code in my previous email). For the 
>>> >>>>>>>>>>>>>>>>> other points:
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the 
>>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the
>>> >>>>>>>>>>>>>>>>> > Kafka Read implementation allows you to set the key and 
>>> >>>>>>>>>>>>>>>>> > value deserializers
>>> >>>>>>>>>>>>>>>>> > which are also used to dictate the output PCollection 
>>> >>>>>>>>>>>>>>>>> > type. It also allows you
>>> >>>>>>>>>>>>>>>>> > to set how the watermark should be computed. 
>>> >>>>>>>>>>>>>>>>> > Technically a user may want the
>>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read and 
>>> >>>>>>>>>>>>>>>>> > they may also want an
>>> >>>>>>>>>>>>>>>>> > output type which is polymorphic (e.g. 
>>> >>>>>>>>>>>>>>>>> > Pcollection<Serializable>).
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Most of the times they do but for parametric types we 
>>> >>>>>>>>>>>>>>>>> cannot support different
>>> >>>>>>>>>>>>>>>>> types in the outputs of the Read or at least I did not 
>>> >>>>>>>>>>>>>>>>> find how to do so (is
>>> >>>>>>>>>>>>>>>>> there a way to use multiple output Coders on Beam?), we 
>>> >>>>>>>>>>>>>>>>> saw this in CassandraIO
>>> >>>>>>>>>>>>>>>>> and we were discussing adding explicitly these Coders or 
>>> >>>>>>>>>>>>>>>>> Serializer
>>> >>>>>>>>>>>>>>>>> specific methods to the ReadAll transform. This is less 
>>> >>>>>>>>>>>>>>>>> nice because it will
>>> >>>>>>>>>>>>>>>>> imply some repeated methods, but it is still a compromise 
>>> >>>>>>>>>>>>>>>>> to gain the other
>>> >>>>>>>>>>>>>>>>> advantages. I suppose the watermark case you mention is 
>>> >>>>>>>>>>>>>>>>> similar because you may
>>> >>>>>>>>>>>>>>>>> want the watermark to behave differently in each Read and 
>>> >>>>>>>>>>>>>>>>> we probably don’t
>>> >>>>>>>>>>>>>>>>> support this, so it corresponds to the polymorphic 
>>> >>>>>>>>>>>>>>>>> category.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object 
>>> >>>>>>>>>>>>>>>>> > modelling concerns.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> > During the implementations of 
>>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered
>>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or were 
>>> >>>>>>>>>>>>>>>>> > ignored if they were set?
>>> >>>>>>>>>>>>>>>>> > If no, then the code deduplication is likely worth it 
>>> >>>>>>>>>>>>>>>>> > because we also get a
>>> >>>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an 
>>> >>>>>>>>>>>>>>>>> > acceptable user
>>> >>>>>>>>>>>>>>>>> > experience?
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> No, not so far. This is an interesting part, notice that 
>>> >>>>>>>>>>>>>>>>> the Read translation
>>> >>>>>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part of 
>>> >>>>>>>>>>>>>>>>> ReadAll so the ReadFn is
>>> >>>>>>>>>>>>>>>>> the real read and must be aware and use all the 
>>> >>>>>>>>>>>>>>>>> parameters.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>     @Override
>>> >>>>>>>>>>>>>>>>>     public PCollection<SolrDocument> expand(PBegin input) 
>>> >>>>>>>>>>>>>>>>> {
>>> >>>>>>>>>>>>>>>>>       return input.apply("Create", 
>>> >>>>>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll());
>>> >>>>>>>>>>>>>>>>>     }
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> I might be missing something for the Unbounded SDF case 
>>> >>>>>>>>>>>>>>>>> which is the only case
>>> >>>>>>>>>>>>>>>>> we have not explored so far. I think one easy way to see 
>>> >>>>>>>>>>>>>>>>> the limitations would
>>> >>>>>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try 
>>> >>>>>>>>>>>>>>>>> to map
>>> >>>>>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> 
>>> >>>>>>>>>>>>>>>>> and the Read logic on
>>> >>>>>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit, 
>>> >>>>>>>>>>>>>>>>> the polymorphic ones
>>> >>>>>>>>>>>>>>>>> will be there for sure, maybe others will appear (not 
>>> >>>>>>>>>>>>>>>>> sure). However it would be
>>> >>>>>>>>>>>>>>>>> interesting to see if we have a real gain in the 
>>> >>>>>>>>>>>>>>>>> maintenance points, but well
>>> >>>>>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so 
>>> >>>>>>>>>>>>>>>>> probably the generic
>>> >>>>>>>>>>>>>>>>> implementation could be relatively complex.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik 
>>> >>>>>>>>>>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for 
>>> >>>>>>>>>>>>>>>>> > cross language. The difference being that the cross 
>>> >>>>>>>>>>>>>>>>> > language transform would take a well known definition 
>>> >>>>>>>>>>>>>>>>> > and convert it to the Read transform. A normal user 
>>> >>>>>>>>>>>>>>>>> > would have a pipeline that would look like:
>>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> 
>>> >>>>>>>>>>>>>>>>> > PCollection<Output>
>>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) 
>>> >>>>>>>>>>>>>>>>> > -> PCollection<Output>
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look like:
>>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> 
>>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> PCollection<Read> -> 
>>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>
>>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> 
>>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> 
>>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>> >>>>>>>>>>>>>>>>> > PCollection<Output>*
>>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) 
>>> >>>>>>>>>>>>>>>>> > only exists since we haven't solved how to use schemas 
>>> >>>>>>>>>>>>>>>>> > with language bound types in a cross language way. 
>>> >>>>>>>>>>>>>>>>> > SchemaCoder isn't portable but RowCoder is which is why 
>>> >>>>>>>>>>>>>>>>> > the conversion step exists. We could have a solution 
>>> >>>>>>>>>>>>>>>>> > for this at some point in time.
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > My concern with using Read was around:
>>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the 
>>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read implementation 
>>> >>>>>>>>>>>>>>>>> > allows you to set the key and value deserializers which 
>>> >>>>>>>>>>>>>>>>> > are also used to dictate the output PCollection type. 
>>> >>>>>>>>>>>>>>>>> > It also allows you to set how the watermark should be 
>>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the watermark 
>>> >>>>>>>>>>>>>>>>> > computation to be configurable per Read and they may 
>>> >>>>>>>>>>>>>>>>> > also want an output type which is polymorphic (e.g. 
>>> >>>>>>>>>>>>>>>>> > PCollection<Serializable>).
>>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object 
>>> >>>>>>>>>>>>>>>>> > modelling concerns.
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > During the implementations of 
>>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered that some 
>>> >>>>>>>>>>>>>>>>> > properties became runtime errors or were ignored if 
>>> >>>>>>>>>>>>>>>>> > they were set? If no, then the code deduplication is 
>>> >>>>>>>>>>>>>>>>> > likely worth it because we also get a lot of javadoc 
>>> >>>>>>>>>>>>>>>>> > deduplication, but if yes is this an acceptable user 
>>> >>>>>>>>>>>>>>>>> > experience?
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko 
>>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as 
>>> >>>>>>>>>>>>>>>>> >> a general "PTransform<PCollection<Read>, 
>>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the amount of 
>>> >>>>>>>>>>>>>>>>> >> code duplication and error-prone approach related to 
>>> >>>>>>>>>>>>>>>>> >> this. It makes much sense since usually we have all 
>>> >>>>>>>>>>>>>>>>> >> needed configuration set in Read objects and, as 
>>> >>>>>>>>>>>>>>>>> >> Ismaeil mentioned, ReadAll will consist mostly of only 
>>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages.  So this case usually can 
>>> >>>>>>>>>>>>>>>>> >> be unified by using PCollection<Read> as input.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java 
>>> >>>>>>>>>>>>>>>>> >> IOs as cross-language transforms (as Luke described) 
>>> >>>>>>>>>>>>>>>>> >> which seems only partly in common with previous 
>>> >>>>>>>>>>>>>>>>> >> pattern of ReadAll using.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read 
>>> >>>>>>>>>>>>>>>>> >> configuration for all needs but seems it’s not easy 
>>> >>>>>>>>>>>>>>>>> >> and I’d be more in favour with Luke and Boyuan 
>>> >>>>>>>>>>>>>>>>> >> approach with schema. Though, maybe ReadAll is not a 
>>> >>>>>>>>>>>>>>>>> >> very suitable name in this case because it will can 
>>> >>>>>>>>>>>>>>>>> >> bring some confusions related to previous pattern of 
>>> >>>>>>>>>>>>>>>>> >> ReadAll uses.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang 
>>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) 
>>> >>>>>>>>>>>>>>>>> >> and (4): use the data type that is schema-aware as the 
>>> >>>>>>>>>>>>>>>>> >> input of ReadAll.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang 
>>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type 
>>> >>>>>>>>>>>>>>>>> >>> that is schema-aware as the input of ReadAll.
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with 
>>> >>>>>>>>>>>>>>>>> >>> SDF-like IO. But only having  (3) is not enough to 
>>> >>>>>>>>>>>>>>>>> >>> solve the problem of using ReadAll in x-lang case.
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of 
>>> >>>>>>>>>>>>>>>>> >>> ReadAll should be able to cross language boundaries 
>>> >>>>>>>>>>>>>>>>> >>> and have compatibilities of updating/downgrading. 
>>> >>>>>>>>>>>>>>>>> >>> After investigating some possibilities(pure java pojo 
>>> >>>>>>>>>>>>>>>>> >>> with custom coder, protobuf, row/schema) in Kafka 
>>> >>>>>>>>>>>>>>>>> >>> usage, we find that row/schema fits our needs most. 
>>> >>>>>>>>>>>>>>>>> >>> Here comes (4). I believe that using Read as input of 
>>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also think 
>>> >>>>>>>>>>>>>>>>> >>> not all IOs have the same need. I would treat Read as 
>>> >>>>>>>>>>>>>>>>> >>> a special type as long as the Read is schema-aware.
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
>>> >>>>>>>>>>>>>>>>> >>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options 
>>> >>>>>>>>>>>>>>>>> >>>> discussed so far when it comes to defining source 
>>> >>>>>>>>>>>>>>>>> >>>> descriptors for ReadAll type transforms
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the 
>>> >>>>>>>>>>>>>>>>> >>>> input PCollection
>>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data 
>>> >>>>>>>>>>>>>>>>> >>>> element of the input PCollection
>>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read 
>>> >>>>>>>>>>>>>>>>> >>>> transform which essentially will convert it to a 
>>> >>>>>>>>>>>>>>>>> >>>> ReadAll (what Eugene mentioned)
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set 
>>> >>>>>>>>>>>>>>>>> >>>> of source descriptions such as files.
>>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will 
>>> >>>>>>>>>>>>>>>>> >>>> make it hard to use the ReadAll transform as a 
>>> >>>>>>>>>>>>>>>>> >>>> cross-language transform and will break the 
>>> >>>>>>>>>>>>>>>>> >>>> separation of construction time and runtime 
>>> >>>>>>>>>>>>>>>>> >>>> constructs
>>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful 
>>> >>>>>>>>>>>>>>>>> >>>> but will make the transform easier to be used as a 
>>> >>>>>>>>>>>>>>>>> >>>> cross-language transform without additional 
>>> >>>>>>>>>>>>>>>>> >>>> modifications
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like 
>>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we might be 
>>> >>>>>>>>>>>>>>>>> >>>> able to just define all sources in that format and 
>>> >>>>>>>>>>>>>>>>> >>>> make Read transforms just an easy to use composite 
>>> >>>>>>>>>>>>>>>>> >>>> built on top of that (by adding a preceding Create 
>>> >>>>>>>>>>>>>>>>> >>>> transform).
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> Thanks,
>>> >>>>>>>>>>>>>>>>> >>>> Cham
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik 
>>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>
>>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be 
>>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically 
>>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform.
>>> >>>>>>>>>>>>>>>>> >>>>>
>>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>>> >>>>>>>>>>>>>>>>> >>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a 
>>> >>>>>>>>>>>>>>>>> >>>>>> transform, at least here: 
>>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time 
>>> >>>>>>>>>>>>>>>>> >>>>>> transforms from execution time data objects that 
>>> >>>>>>>>>>>>>>>>> >>>>>> we store in PCollections as Luke mentioned. Also, 
>>> >>>>>>>>>>>>>>>>> >>>>>> we don't guarantee that PTransform is serializable 
>>> >>>>>>>>>>>>>>>>> >>>>>> so users have the additional complexity of 
>>> >>>>>>>>>>>>>>>>> >>>>>> providing a corder whenever a PTransform is used 
>>> >>>>>>>>>>>>>>>>> >>>>>> as a data object.
>>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java 
>>> >>>>>>>>>>>>>>>>> >>>>>> objects that are convertible to Beam Rows allow us 
>>> >>>>>>>>>>>>>>>>> >>>>>> to make these transforms available to other SDKs 
>>> >>>>>>>>>>>>>>>>> >>>>>> through the cross-language transforms. Using 
>>> >>>>>>>>>>>>>>>>> >>>>>> transforms or complex sources as data objects will 
>>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult.
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> Thanks,
>>> >>>>>>>>>>>>>>>>> >>>>>> Cham
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
>>> >>>>>>>>>>>>>>>>> >>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to 
>>> >>>>>>>>>>>>>>>>> >>>>>>> the IO with SDF implementation despite the type 
>>> >>>>>>>>>>>>>>>>> >>>>>>> of input, where Read refers to UnboundedSource.  
>>> >>>>>>>>>>>>>>>>> >>>>>>> One major pushback of using KafkaIO.Read as 
>>> >>>>>>>>>>>>>>>>> >>>>>>> source description is that not all configurations 
>>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate during 
>>> >>>>>>>>>>>>>>>>> >>>>>>> execution time. Also when thinking about x-lang 
>>> >>>>>>>>>>>>>>>>> >>>>>>> useage, making source description across language 
>>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries is also necessary.  As Luke mentioned, 
>>> >>>>>>>>>>>>>>>>> >>>>>>> it's quite easy to infer a Schema from an 
>>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: KafkaSourceDescription.java. 
>>> >>>>>>>>>>>>>>>>> >>>>>>> Then the coder of this schema-aware object will 
>>> >>>>>>>>>>>>>>>>> >>>>>>> be a SchemaCoder. When crossing language 
>>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row into 
>>> >>>>>>>>>>>>>>>>> >>>>>>> the source description: Convert.fromRows.
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik 
>>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that contains 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the configurable parameters for reading from 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Kafka. This is different from the pattern that 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Ismael listed because they take 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<Read> as input and the Read is the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> same as the Read PTransform class used for the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> non read all case.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to configure 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the transform have to be copied over to the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> source descriptor but decouples how a transform 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> is specified from the object that describes what 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> needs to be done. I believe Ismael's point is 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> believe is a non-issue is that the Beam Java SDK 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> has the most IO connectors and we would want to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> use the IO implementations within Beam Go and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Python. This brings in its own set of 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and compatibility 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> for the wire format and how one parameterizes 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> such transforms. The wire format issue can be 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> solved with either approach by making sure that 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the cross language expansion always takes the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into Read/KafkaSourceDescriptor/... 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> object that is then passed to the ReadAll 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> transform. Boyuan has been looking to make the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it can be 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> represented as a row and this can be done easily 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue integration (I don't believe 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> there is anything preventing someone from 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> writing a schema row -> Read -> row adapter or 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> also using the AutoValue configuration if the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue).
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> separation of concerns provided by using a 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> different object to represent the contents of 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the PCollection from the pipeline construction 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> time PTransform.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> an approach similar (or dual) to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> FileIO.write(), where we in a sense also have 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> to configure a dynamic number different IO 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> transforms of the same type (file writes)?
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> aspects of many file writes:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic()
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Convert the data to be written to CSVSink
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>           type -> new 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames()))
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type + 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> "-transactions", ".csv"));
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> reads:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> from which all the read parameters can be 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> inferred
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo>readAll()
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> bar...)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> for this bar...)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn based ones. One pattern
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The idea is to have a different
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> doing multiple queries in the same
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> querying from multiple tables at the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms the parameters for
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of duplicated with methods and
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into the ReadAll transforms.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand the input parameters of the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> that resembles the full `Read`
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> read from multiple tables or
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this was not in the intermediate
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> methods (duplicating more code)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> full spec.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> method we end up adding them
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> so they are taken into account.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> test a new approach that is
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> code became:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> improvements on parameters of normal
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> parameters. But of course there are
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> normal Reads for example
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> between IOs (e.g. the    non-SDF
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument> 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) {
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn()))
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey())
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn()));
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     }
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   }
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> results ReadAll you must have the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent types from the data
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> add extra withCoder method(s) on
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> So I wanted to bring this subject
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> if you see any sort of issues that
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to start using consistently the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read and the readAll() method for new
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the only remaining inconsistent
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> apart of this we should be ok).
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll and maybe it is worth to be
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik 
>>> >>>>>>>>>>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for 
>>> >>>>>>>>>>>>>>>>> > cross language. The difference being that the cross 
>>> >>>>>>>>>>>>>>>>> > language transform would take a well known definition 
>>> >>>>>>>>>>>>>>>>> > and convert it to the Read transform. A normal user 
>>> >>>>>>>>>>>>>>>>> > would have a pipeline that would look like:
>>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> 
>>> >>>>>>>>>>>>>>>>> > PCollection<Output>
>>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) 
>>> >>>>>>>>>>>>>>>>> > -> PCollection<Output>
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look like:
>>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> 
>>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> PCollection<Read> -> 
>>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>
>>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> 
>>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> 
>>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>> >>>>>>>>>>>>>>>>> > PCollection<Output>*
>>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) 
>>> >>>>>>>>>>>>>>>>> > only exists since we haven't solved how to use schemas 
>>> >>>>>>>>>>>>>>>>> > with language bound types in a cross language way. 
>>> >>>>>>>>>>>>>>>>> > SchemaCoder isn't portable but RowCoder is which is why 
>>> >>>>>>>>>>>>>>>>> > the conversion step exists. We could have a solution 
>>> >>>>>>>>>>>>>>>>> > for this at some point in time.
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > My concern with using Read was around:
>>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the 
>>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read implementation 
>>> >>>>>>>>>>>>>>>>> > allows you to set the key and value deserializers which 
>>> >>>>>>>>>>>>>>>>> > are also used to dictate the output PCollection type. 
>>> >>>>>>>>>>>>>>>>> > It also allows you to set how the watermark should be 
>>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the watermark 
>>> >>>>>>>>>>>>>>>>> > computation to be configurable per Read and they may 
>>> >>>>>>>>>>>>>>>>> > also want an output type which is polymorphic (e.g. 
>>> >>>>>>>>>>>>>>>>> > PCollection<Serializable>).
>>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object 
>>> >>>>>>>>>>>>>>>>> > modelling concerns.
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > During the implementations of 
>>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered that some 
>>> >>>>>>>>>>>>>>>>> > properties became runtime errors or were ignored if 
>>> >>>>>>>>>>>>>>>>> > they were set? If no, then the code deduplication is 
>>> >>>>>>>>>>>>>>>>> > likely worth it because we also get a lot of javadoc 
>>> >>>>>>>>>>>>>>>>> > deduplication, but if yes is this an acceptable user 
>>> >>>>>>>>>>>>>>>>> > experience?
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> >
>>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko 
>>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as 
>>> >>>>>>>>>>>>>>>>> >> a general "PTransform<PCollection<Read>, 
>>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the amount of 
>>> >>>>>>>>>>>>>>>>> >> code duplication and error-prone approach related to 
>>> >>>>>>>>>>>>>>>>> >> this. It makes much sense since usually we have all 
>>> >>>>>>>>>>>>>>>>> >> needed configuration set in Read objects and, as 
>>> >>>>>>>>>>>>>>>>> >> Ismaeil mentioned, ReadAll will consist mostly of only 
>>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages.  So this case usually can 
>>> >>>>>>>>>>>>>>>>> >> be unified by using PCollection<Read> as input.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java 
>>> >>>>>>>>>>>>>>>>> >> IOs as cross-language transforms (as Luke described) 
>>> >>>>>>>>>>>>>>>>> >> which seems only partly in common with previous 
>>> >>>>>>>>>>>>>>>>> >> pattern of ReadAll using.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read 
>>> >>>>>>>>>>>>>>>>> >> configuration for all needs but seems it’s not easy 
>>> >>>>>>>>>>>>>>>>> >> and I’d be more in favour with Luke and Boyuan 
>>> >>>>>>>>>>>>>>>>> >> approach with schema. Though, maybe ReadAll is not a 
>>> >>>>>>>>>>>>>>>>> >> very suitable name in this case because it will can 
>>> >>>>>>>>>>>>>>>>> >> bring some confusions related to previous pattern of 
>>> >>>>>>>>>>>>>>>>> >> ReadAll uses.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang 
>>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) 
>>> >>>>>>>>>>>>>>>>> >> and (4): use the data type that is schema-aware as the 
>>> >>>>>>>>>>>>>>>>> >> input of ReadAll.
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang 
>>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type 
>>> >>>>>>>>>>>>>>>>> >>> that is schema-aware as the input of ReadAll.
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with 
>>> >>>>>>>>>>>>>>>>> >>> SDF-like IO. But only having  (3) is not enough to 
>>> >>>>>>>>>>>>>>>>> >>> solve the problem of using ReadAll in x-lang case.
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of 
>>> >>>>>>>>>>>>>>>>> >>> ReadAll should be able to cross language boundaries 
>>> >>>>>>>>>>>>>>>>> >>> and have compatibilities of updating/downgrading. 
>>> >>>>>>>>>>>>>>>>> >>> After investigating some possibilities(pure java pojo 
>>> >>>>>>>>>>>>>>>>> >>> with custom coder, protobuf, row/schema) in Kafka 
>>> >>>>>>>>>>>>>>>>> >>> usage, we find that row/schema fits our needs most. 
>>> >>>>>>>>>>>>>>>>> >>> Here comes (4). I believe that using Read as input of 
>>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also think 
>>> >>>>>>>>>>>>>>>>> >>> not all IOs have the same need. I would treat Read as 
>>> >>>>>>>>>>>>>>>>> >>> a special type as long as the Read is schema-aware.
>>> >>>>>>>>>>>>>>>>> >>>
>>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
>>> >>>>>>>>>>>>>>>>> >>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options 
>>> >>>>>>>>>>>>>>>>> >>>> discussed so far when it comes to defining source 
>>> >>>>>>>>>>>>>>>>> >>>> descriptors for ReadAll type transforms
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the 
>>> >>>>>>>>>>>>>>>>> >>>> input PCollection
>>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data 
>>> >>>>>>>>>>>>>>>>> >>>> element of the input PCollection
>>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read 
>>> >>>>>>>>>>>>>>>>> >>>> transform which essentially will convert it to a 
>>> >>>>>>>>>>>>>>>>> >>>> ReadAll (what Eugene mentioned)
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set 
>>> >>>>>>>>>>>>>>>>> >>>> of source descriptions such as files.
>>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will 
>>> >>>>>>>>>>>>>>>>> >>>> make it hard to use the ReadAll transform as a 
>>> >>>>>>>>>>>>>>>>> >>>> cross-language transform and will break the 
>>> >>>>>>>>>>>>>>>>> >>>> separation of construction time and runtime 
>>> >>>>>>>>>>>>>>>>> >>>> constructs
>>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful 
>>> >>>>>>>>>>>>>>>>> >>>> but will make the transform easier to be used as a 
>>> >>>>>>>>>>>>>>>>> >>>> cross-language transform without additional 
>>> >>>>>>>>>>>>>>>>> >>>> modifications
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like 
>>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we might be 
>>> >>>>>>>>>>>>>>>>> >>>> able to just define all sources in that format and 
>>> >>>>>>>>>>>>>>>>> >>>> make Read transforms just an easy to use composite 
>>> >>>>>>>>>>>>>>>>> >>>> built on top of that (by adding a preceding Create 
>>> >>>>>>>>>>>>>>>>> >>>> transform).
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> Thanks,
>>> >>>>>>>>>>>>>>>>> >>>> Cham
>>> >>>>>>>>>>>>>>>>> >>>>
>>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik 
>>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>
>>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be 
>>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically 
>>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform.
>>> >>>>>>>>>>>>>>>>> >>>>>
>>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>>> >>>>>>>>>>>>>>>>> >>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a 
>>> >>>>>>>>>>>>>>>>> >>>>>> transform, at least here: 
>>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time 
>>> >>>>>>>>>>>>>>>>> >>>>>> transforms from execution time data objects that 
>>> >>>>>>>>>>>>>>>>> >>>>>> we store in PCollections as Luke mentioned. Also, 
>>> >>>>>>>>>>>>>>>>> >>>>>> we don't guarantee that PTransform is serializable 
>>> >>>>>>>>>>>>>>>>> >>>>>> so users have the additional complexity of 
>>> >>>>>>>>>>>>>>>>> >>>>>> providing a corder whenever a PTransform is used 
>>> >>>>>>>>>>>>>>>>> >>>>>> as a data object.
>>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java 
>>> >>>>>>>>>>>>>>>>> >>>>>> objects that are convertible to Beam Rows allow us 
>>> >>>>>>>>>>>>>>>>> >>>>>> to make these transforms available to other SDKs 
>>> >>>>>>>>>>>>>>>>> >>>>>> through the cross-language transforms. Using 
>>> >>>>>>>>>>>>>>>>> >>>>>> transforms or complex sources as data objects will 
>>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult.
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> Thanks,
>>> >>>>>>>>>>>>>>>>> >>>>>> Cham
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
>>> >>>>>>>>>>>>>>>>> >>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to 
>>> >>>>>>>>>>>>>>>>> >>>>>>> the IO with SDF implementation despite the type 
>>> >>>>>>>>>>>>>>>>> >>>>>>> of input, where Read refers to UnboundedSource.  
>>> >>>>>>>>>>>>>>>>> >>>>>>> One major pushback of using KafkaIO.Read as 
>>> >>>>>>>>>>>>>>>>> >>>>>>> source description is that not all configurations 
>>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate during 
>>> >>>>>>>>>>>>>>>>> >>>>>>> execution time. Also when thinking about x-lang 
>>> >>>>>>>>>>>>>>>>> >>>>>>> useage, making source description across language 
>>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries is also necessary.  As Luke mentioned, 
>>> >>>>>>>>>>>>>>>>> >>>>>>> it's quite easy to infer a Schema from an 
>>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: KafkaSourceDescription.java. 
>>> >>>>>>>>>>>>>>>>> >>>>>>> Then the coder of this schema-aware object will 
>>> >>>>>>>>>>>>>>>>> >>>>>>> be a SchemaCoder. When crossing language 
>>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row into 
>>> >>>>>>>>>>>>>>>>> >>>>>>> the source description: Convert.fromRows.
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik 
>>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that contains 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the configurable parameters for reading from 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Kafka. This is different from the pattern that 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Ismael listed because they take 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<Read> as input and the Read is the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> same as the Read PTransform class used for the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> non read all case.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to configure 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the transform have to be copied over to the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> source descriptor but decouples how a transform 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> is specified from the object that describes what 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> needs to be done. I believe Ismael's point is 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> believe is a non-issue is that the Beam Java SDK 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> has the most IO connectors and we would want to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> use the IO implementations within Beam Go and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Python. This brings in its own set of 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and compatibility 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> for the wire format and how one parameterizes 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> such transforms. The wire format issue can be 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> solved with either approach by making sure that 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the cross language expansion always takes the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into Read/KafkaSourceDescriptor/... 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> object that is then passed to the ReadAll 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> transform. Boyuan has been looking to make the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it can be 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> represented as a row and this can be done easily 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue integration (I don't believe 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> there is anything preventing someone from 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> writing a schema row -> Read -> row adapter or 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> also using the AutoValue configuration if the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue).
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> separation of concerns provided by using a 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> different object to represent the contents of 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> the PCollection from the pipeline construction 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> time PTransform.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>>> >>>>>>>>>>>>>>>>> >>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> an approach similar (or dual) to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> FileIO.write(), where we in a sense also have 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> to configure a dynamic number different IO 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> transforms of the same type (file writes)?
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> aspects of many file writes:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic()
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Convert the data to be written to CSVSink
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>           type -> new 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames()))
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type + 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> "-transactions", ".csv"));
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> reads:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> from which all the read parameters can be 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> inferred
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo>readAll()
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> bar...)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> for this bar...)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>> <[email protected]> wrote:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn based ones. One pattern
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The idea is to have a different
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> doing multiple queries in the same
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> querying from multiple tables at the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms the parameters for
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of duplicated with methods and
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into the ReadAll transforms.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand the input parameters of the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> that resembles the full `Read`
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> read from multiple tables or
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this was not in the intermediate
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> methods (duplicating more code)
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> full spec.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> method we end up adding them
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> so they are taken into account.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> test a new approach that is
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> code became:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> improvements on parameters of normal
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> parameters. But of course there are
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> normal Reads for example
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> between IOs (e.g. the    non-SDF
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument> 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) {
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn()))
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey())
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn()));
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>     }
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>   }
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> results ReadAll you must have the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent types from the data
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> add extra withCoder method(s) on
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> So I wanted to bring this subject
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> if you see any sort of issues that
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to start using consistently the
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read and the readAll() method for new
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the only remaining inconsistent
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> apart of this we should be ok).
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called 
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll and maybe it is worth to be
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>> >>
>>> >>>>>>>>>>>>>>>>>

Reply via email to