OK, I'm +0 on this change. Using the PTransform as an element is probably better than duplicating the full API on another interface, and think it's worth getting this ublocked. This will require a Read2 if we have to add options in a upgrade-compatible way.
On Tue, Jul 7, 2020 at 3:19 PM Luke Cwik <[email protected]> wrote: > > Robert, you're correct in your understanding that the Read PTransform would > be encoded via the schema coder. > > Kenn, different serializers are ok as long as the output coder can > encode/decode the output type. Different watermark fns are also ok since it > is about computing the watermark for each individual source and won't impact > the watermark computed by other sources. Watermark advancement will still be > held back by the source that is furthest behind and still has the same > problems when a user chooses a watermark fn that was incompatible with the > windowing strategy for producing output (e.g. global window + default trigger > + streaming pipeline). > > Both are pretty close so if we started from scratch then it could go either > way but we aren't starting from scratch (I don't think a Beam 3.0 is likely > to happen in the next few years as there isn't enough stuff that we want to > remove vs the amount of stuff we would gain). > > On Tue, Jul 7, 2020 at 2:57 PM Kenneth Knowles <[email protected]> wrote: >> >> On Tue, Jul 7, 2020 at 2:24 PM Robert Bradshaw <[email protected]> wrote: >>> >>> On Tue, Jul 7, 2020 at 2:06 PM Luke Cwik <[email protected]> wrote: >>> > >>> > Robert, the intent is that the Read object would use a schema coder and >>> > for XLang purposes would be no different then a POJO. >>> >>> Just to clarify, you're saying that the Read PTransform would be >>> encoded via the schema coder? That still feels a bit odd (and >>> specificically if we were designing IO from scratch rather than >>> adapting to what already exists would we choose to use PTransforms as >>> elements?) but would solve the cross language issue. >> >> >> I like this question. If we were designing from scratch, what would we do? >> Would we encourage users to feed Create.of(SourceDescriptor) into ReadAll? >> We would probably provide a friendly wrapper for reading one static thing, >> and call it Read. But it would probably have an API like >> Read.from(SourceDescriptor), thus eliminating duplicate documentation and >> boilerplate that Luke described while keeping the separation that Brian >> described and clarity around xlang environments. But I'm +0 on whatever has >> momentum. I think the main downside is the weirdness around >> serializers/watermarkFn/etc on Read. I am not sure how much this will cause >> users problems. It would be very ambitious of them to produce a >> PCollection<Read> where they had different fns per element... >> >> Kenn >> >>> >>> > The issue of how to deal with closures applies to both equally and that >>> > is why I suggested to favor using data over closures. Once there is an >>> > implementation for how to deal with UDFs in an XLang world, this guidance >>> > can change. >>> > >>> > Kenn, I did mean specifying an enum that the XLang expansion service >>> > would return a serialized blob of code. The XLang expansion service is >>> > responsible for returning an environment that contains all the necessary >>> > dependencies to execute the transforms and the serialized blob of code >>> > and hence would be a non-issue for the caller. >>> > >>> > From reviewing the SDF Kafka PR, the reduction in maintenance is >>> > definitely there (100s of lines of duplicated boilerplate and >>> > documentation). >>> > >>> > What are the next steps to get a resolution on this? >>> > >>> > On Thu, Jul 2, 2020 at 10:38 AM Robert Bradshaw <[email protected]> >>> > wrote: >>> >> >>> >> On Thu, Jul 2, 2020 at 10:26 AM Kenneth Knowles <[email protected]> wrote: >>> >>> >>> >>> >>> >>> On Wed, Jul 1, 2020 at 4:17 PM Eugene Kirpichov <[email protected]> >>> >>> wrote: >>> >>>> >>> >>>> Kenn - I don't mean an enum of common closures, I mean expressing >>> >>>> closures in a restricted sub-language such as the language of SQL >>> >>>> expressions. >>> >>> >>> >>> >>> >>> My lack of clarity: enums was my phrasing of Luke's item 1). I >>> >>> understood what you meant. I think either a set of well-known closures >>> >>> or a tiny sublanguage could add value. >>> >>> >>> >>>> >>> >>>> That would only work if there is a portable way to interpret SQL >>> >>>> expressions, but if there isn't, maybe there should be - for the sake >>> >>>> of, well, expressing closures portably. Of course these would be >>> >>>> closures that only work with rows - but that seems powerful enough for >>> >>>> many if not most purposes. >>> >>> >>> >>> >>> >>> You can choose a SQL dialect or choose the tiniest subset just for this >>> >>> purpose and go with it. But when the data type going in or out of the >>> >>> lambda are e.g. some Java or Python object then what? One idea is to >>> >>> always require these to be rows. But if you can really get away with a >>> >>> dependency-free context-free lambda, then Javascript or Python is as >>> >>> doable as SQL in terms of having a tiny restricted language for just >>> >>> this purpose. I would expect once it got used, folks would start to ask >>> >>> to include the rest of what the language has to offer - its ecosystem. >>> >>> This is always the main design point I am interested in for >>> >>> "lightweight" embedded UDF proposals. >>> >> >>> >> >>> >> This is getting off the topic of ReadAll, but I think being able to do >>> >> arbitrary computation in preceding/succeeding transform plus a (quite) >>> >> restricted language in the transform itself can go a long way. (For >>> >> example, one could have a dynamic destinations write that takes a >>> >> KV<element, dest> where dest is a format string like >>> >> "foo-{{shard_num}}.txt" to plug in the truly dynamic pieces, but the >>> >> dest string itself can be computed (e.g. based on the element) using >>> >> arbitrary code in the caller language.) >>> >> >>> >>> >>> >>> >>> >>> Kenn >>> >>> >>> >>>> >>> >>>> For example, maybe the Java example: >>> >>>> >>> >>>> PCollection<BankTransaction> transactions = ...; >>> >>>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic() >>> >>>> .by(Transaction::getType) >>> >>>> .via(tx -> tx.getType().toFields(tx), // Convert the data to be >>> >>>> written to CSVSink >>> >>>> type -> new CSVSink(type.getFieldNames())) >>> >>>> .to(".../path/to/") >>> >>>> .withNaming(type -> defaultNaming(type + "-transactions", >>> >>>> ".csv")); >>> >>>> >>> >>>> could be written in Python as: >>> >>>> >>> >>>> transactions | fileio.write_dynamic( >>> >>>> by="it.type", # "it" is implicitly available in these SQL >>> >>>> expressions as the same thing as the Java lambda argument >>> >>>> format="it.fields", >>> >>>> sink="CSV_SINK(it.type.field_names)", # A bunch of preset sinks >>> >>>> supported in every language? >>> >>>> to=".../path/to/", >>> >>>> naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')") >>> >>>> >>> >>>> Again, to be clear, I'm not suggesting to block what Ismael is >>> >>>> proposing on getting this done - getting this done wouldn't be a short >>> >>>> term effort, but seems potentially really nice. >>> >>>> >>> >>>> >>> >>>> On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <[email protected]> wrote: >>> >>>>> >>> >>>>> From the Go side of the table, the Go language doesn't provide a >>> >>>>> mechanism to serialize or access closure data, which means DoFns >>> >>>>> can't be functional closures.This combined with the move to have the >>> >>>>> "Structural DoFns" be serialized using Beam Schemas, has the net >>> >>>>> result that if Go transforms are used for Cross Language, they will >>> >>>>> be configurable with a Schema of the configuration data. >>> >>>>> >>> >>>>> Of course, this just means that each language will probably provide >>> >>>>> whichever mechanisms it likes for use of it's cross language >>> >>>>> transforms. >>> >>>>> >>> >>>>> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <[email protected]> wrote: >>> >>>>>> >>> >>>>>> I don't think an enum of most common closures will work. The input >>> >>>>>> types are typically generics that are made concrete by the caller >>> >>>>>> who also provides the closures. I think Luke's (2) is the same idea >>> >>>>>> as my "Java still assembles it [using opaque Python >>> >>>>>> closures/transforms]". It seems like an approach to (3). Passing >>> >>>>>> over actual code could address some cases, but libraries become the >>> >>>>>> issue. >>> >>>>>> >>> >>>>>> I think it is fair to say that "WriteAll" style would involve >>> >>>>>> entering unexplored territory. >>> >>>>>> >>> >>>>>> On the main topic, I think Brian has a pretty strong point and his >>> >>>>>> example of type conversion lambdas is a good example. I did a quick >>> >>>>>> survey and every other property I could find does seem like it fits >>> >>>>>> on the Read, and most IOs have a few of these closures for example >>> >>>>>> also extracting timestamps. So maybe just a resolution convention of >>> >>>>>> putting them on the ReadAll and that taking precedence. Then you >>> >>>>>> would be deserializing a Read transform with insta-crash methods or >>> >>>>>> some such? >>> >>>>>> >>> >>>>>> Kenn >>> >>>>>> >>> >>>>>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov >>> >>>>>> <[email protected]> wrote: >>> >>>>>>> >>> >>>>>>> Yeah, mainly I just feel like dynamic reads and dynamic writes (and >>> >>>>>>> perhaps not-yet-invented similar transforms of other kinds) are >>> >>>>>>> tightly related - they are either very similar, or are duals of >>> >>>>>>> each other - so they should use the same approach. If they are >>> >>>>>>> using different approaches, it is a sign that either one of them is >>> >>>>>>> being done wrong or that we are running into a fundamental >>> >>>>>>> limitation of Beam (e.g. difficulty of encoding closures compared >>> >>>>>>> to encoding elements). >>> >>>>>>> >>> >>>>>>> But I agree with Luke that we shouldn't give up on closures. >>> >>>>>>> Especially with the work that has been done on schemas and SQL, I >>> >>>>>>> see no reason why we couldn't express closures in a portable >>> >>>>>>> restricted sub-language. If we can express SQL, we can express many >>> >>>>>>> or most use cases of dynamic reads/writes - I don't mean that we >>> >>>>>>> should actually use SQL (though we could - e.g. SQL scalar >>> >>>>>>> expressions seem powerful enough to express the closures appearing >>> >>>>>>> in most use cases of FileIO.writeDynamic), I just mean that SQL is >>> >>>>>>> an existence proof. >>> >>>>>>> >>> >>>>>>> (I don't want to rock the boat too much, just thought I'd chime in >>> >>>>>>> as this topic is dear to my heart) >>> >>>>>>> >>> >>>>>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> wrote: >>> >>>>>>>> >>> >>>>>>>> Kenn, I'm not too worried about closures since: >>> >>>>>>>> 1) the expansion service for a transform could have a well set of >>> >>>>>>>> defined closures by name that are returned as serialized objects >>> >>>>>>>> that don't need to be interpretable by the caller >>> >>>>>>>> 2) the language could store serialized functions of another >>> >>>>>>>> language as constants >>> >>>>>>>> 3) generic XLang function support will eventually be needed >>> >>>>>>>> but I do agree that closures do make things difficult to express >>> >>>>>>>> vs data which is why primarily why we should prefer data over >>> >>>>>>>> closures when possible and use closures when expressing it with >>> >>>>>>>> data would be too cumbersome. >>> >>>>>>>> >>> >>>>>>>> Brian, so far the cases that have been migrated have shown that >>> >>>>>>>> the source descriptor and the Read transform are almost the same >>> >>>>>>>> (some parameters that only impact pipeline construction such as >>> >>>>>>>> coders differ). >>> >>>>>>>> >>> >>>>>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette >>> >>>>>>>> <[email protected]> wrote: >>> >>>>>>>>> >>> >>>>>>>>> Sorry for jumping into this late and casting a vote against the >>> >>>>>>>>> consensus... but I think I'd prefer standardizing on a pattern >>> >>>>>>>>> like PCollection<KafkaSourceDescriptor> rather than >>> >>>>>>>>> PCollection<Read>. That approach clearly separates the parameters >>> >>>>>>>>> that are allowed to vary across a ReadAll (the ones defined in >>> >>>>>>>>> KafkaSourceDescriptor) from the parameters that should be >>> >>>>>>>>> constant (other parameters in the Read object, like >>> >>>>>>>>> SerializedFunctions for type conversions, parameters for >>> >>>>>>>>> different operating modes, etc...). I think it's helpful to think >>> >>>>>>>>> of the parameters that are allowed to vary as some "location >>> >>>>>>>>> descriptor", but I imagine IO authors may want other parameters >>> >>>>>>>>> to vary across a ReadAll as well. >>> >>>>>>>>> >>> >>>>>>>>> To me it seems safer to let an IO author "opt-in" to a parameter >>> >>>>>>>>> being dynamic at execution time. >>> >>>>>>>>> >>> >>>>>>>>> Brian >>> >>>>>>>>> >>> >>>>>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov >>> >>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>> >>> >>>>>>>>>> I'd like to raise one more time the question of consistency >>> >>>>>>>>>> between dynamic reads and dynamic writes, per my email at the >>> >>>>>>>>>> beginning of the thread. >>> >>>>>>>>>> If the community prefers ReadAll to read from Read, then should >>> >>>>>>>>>> dynamicWrite's write to Write? >>> >>>>>>>>>> >>> >>>>>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang >>> >>>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>> >>> >>>>>>>>>>> It seems like most of us agree on the idea that ReadAll should >>> >>>>>>>>>>> read from Read. I'm going to update the Kafka ReadAll with the >>> >>>>>>>>>>> same pattern. >>> >>>>>>>>>>> Thanks for all your help! >>> >>>>>>>>>>> >>> >>>>>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath >>> >>>>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]> >>> >>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> I would also like to suggest that transforms that implement >>> >>>>>>>>>>>>> ReadAll via Read should also provide methods like: >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> // Uses the specified values if unspecified in the input >>> >>>>>>>>>>>>> element from the PCollection<Read>. >>> >>>>>>>>>>>>> withDefaults(Read read); >>> >>>>>>>>>>>>> // Uses the specified values regardless of what the input >>> >>>>>>>>>>>>> element from the PCollection<Read> specifies. >>> >>>>>>>>>>>>> withOverrides(Read read); >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> and only adds methods that are required at construction time >>> >>>>>>>>>>>>> (e.g. coders). This way the majority of documentation sits on >>> >>>>>>>>>>>>> the Read transform. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here >>> >>>>>>>>>>>> and some of the drawbacks related to cross-language can be >>> >>>>>>>>>>>> overcome through future advancements. >>> >>>>>>>>>>>> Thanks for bringing this up Ismaël. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> - Cham >>> >>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> >>> >>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Ismael, it is good to hear that using Read as the input >>> >>>>>>>>>>>>>> didn't have a bunch of parameters that were being >>> >>>>>>>>>>>>>> skipped/ignored. Also, for the polymorphism issue you have >>> >>>>>>>>>>>>>> to rely on the user correctly telling you the type in such a >>> >>>>>>>>>>>>>> way where it is a common ancestor of all the runtime types >>> >>>>>>>>>>>>>> that will ever be used. This usually boils down to something >>> >>>>>>>>>>>>>> like Serializable or DynamicMessage such that the coder that >>> >>>>>>>>>>>>>> is chosen works for all the runtime types. Using multiple >>> >>>>>>>>>>>>>> types is a valid use case and would allow for a simpler >>> >>>>>>>>>>>>>> graph with less flattens merging the output from multiple >>> >>>>>>>>>>>>>> sources. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for >>> >>>>>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the >>> >>>>>>>>>>>>>> parameters can't be represented in a meaningful way beyond >>> >>>>>>>>>>>>>> "bytes". This would be helpful for cross language as well >>> >>>>>>>>>>>>>> since every parameter would become available if a language >>> >>>>>>>>>>>>>> could support it (e.g. it could serialize a java function up >>> >>>>>>>>>>>>>> front and keep it saved as raw bytes within said language). >>> >>>>>>>>>>>>>> Even if we figure out a better way to do this in the future, >>> >>>>>>>>>>>>>> we'll have to change the schema for the new way anyway. This >>> >>>>>>>>>>>>>> would mean that the external version of the transform adopts >>> >>>>>>>>>>>>>> Row to Read and we drop KafkaSourceDescriptor. The >>> >>>>>>>>>>>>>> conversion from Row to Read could validate that the >>> >>>>>>>>>>>>>> parameters make sense (e.g. the bytes are valid serialized >>> >>>>>>>>>>>>>> functions). The addition of an endReadTime/endReadOffset >>> >>>>>>>>>>>>>> would make sense for KafkaIO.Read as well and this would >>> >>>>>>>>>>>>>> enable having a bounded version that could be used for >>> >>>>>>>>>>>>>> backfills (this doesn't have to be done as part of any >>> >>>>>>>>>>>>>> current ongoing PR). Essentially any parameter that could be >>> >>>>>>>>>>>>>> added for a single instance of a Kafka element+restriction >>> >>>>>>>>>>>>>> would also make sense to the KafkaIO.Read transform since it >>> >>>>>>>>>>>>>> too is a single instance. There are parameters that would >>> >>>>>>>>>>>>>> apply to the ReadAll that wouldn't apply to a read and these >>> >>>>>>>>>>>>>> would be global parameters across all element+restriction >>> >>>>>>>>>>>>>> pairs such as config overrides or default values. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> I am convinced that we should do as Ismael is suggesting and >>> >>>>>>>>>>>>>> use KafkaIO.Read as the type. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath >>> >>>>>>>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Discussion regarding cross-language transforms is a slight >>> >>>>>>>>>>>>>>> tangent here. But I think, in general, it's great if we can >>> >>>>>>>>>>>>>>> use existing transforms (for example, IO connectors) as >>> >>>>>>>>>>>>>>> cross-language transforms without having to build more >>> >>>>>>>>>>>>>>> composites (irrespective of whether in >>> >>>>>>>>>>>>>>> ExternalTransformBuilders or a user pipelines) just to make >>> >>>>>>>>>>>>>>> them cross-language compatible. A future cross-language >>> >>>>>>>>>>>>>>> compatible SchemaCoder might help (assuming that works for >>> >>>>>>>>>>>>>>> Read transform) but I'm not sure we have a good idea when >>> >>>>>>>>>>>>>>> we'll get to that state. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Thanks, >>> >>>>>>>>>>>>>>> Cham >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang >>> >>>>>>>>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the >>> >>>>>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline update >>> >>>>>>>>>>>>>>>> scenario(For detailed discussion, please refer to >>> >>>>>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). >>> >>>>>>>>>>>>>>>> In order to obtain the compatibility, it requires the >>> >>>>>>>>>>>>>>>> input of the read SDF is schema-aware. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor >>> >>>>>>>>>>>>>>>> to PCollection<Read> is, the KafkaIO.Read also needs to be >>> >>>>>>>>>>>>>>>> schema-aware, otherwise pipeline updates might fail >>> >>>>>>>>>>>>>>>> unnecessarily. If looking into KafkaIO.Read, not all >>> >>>>>>>>>>>>>>>> necessary fields are compatible with schema, for example, >>> >>>>>>>>>>>>>>>> SerializedFunction. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a >>> >>>>>>>>>>>>>>>> common pattern for SDF based IO. The Read can be a common >>> >>>>>>>>>>>>>>>> pattern because the input is always a PBegin. But for an >>> >>>>>>>>>>>>>>>> SDF based IO, the input can be anything. By using Read as >>> >>>>>>>>>>>>>>>> input, we will still have the maintenance cost when SDF IO >>> >>>>>>>>>>>>>>>> supports a new field but Read doesn't consume it. For >>> >>>>>>>>>>>>>>>> example, we are discussing adding endOffset and >>> >>>>>>>>>>>>>>>> endReadTime to KafkaSourceDescriptior, which is not used >>> >>>>>>>>>>>>>>>> in KafkaIO.Read. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía >>> >>>>>>>>>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> We forgot to mention (5) External.Config used in >>> >>>>>>>>>>>>>>>>> cross-lang, see KafkaIO >>> >>>>>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the >>> >>>>>>>>>>>>>>>>> predecessor of (4) and probably a >>> >>>>>>>>>>>>>>>>> really good candidate to be replaced by the Row based >>> >>>>>>>>>>>>>>>>> Configuration Boyuan is >>> >>>>>>>>>>>>>>>>> envisioning (so good to be aware of this). >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention the >>> >>>>>>>>>>>>>>>>> real issue(s). All the >>> >>>>>>>>>>>>>>>>> approaches discussed so far in the end could be easily >>> >>>>>>>>>>>>>>>>> transformed to produce a >>> >>>>>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be read >>> >>>>>>>>>>>>>>>>> by the generic ReadAll >>> >>>>>>>>>>>>>>>>> transform. Notice that this can be internal in some IOs >>> >>>>>>>>>>>>>>>>> e.g. KafkaIO if they >>> >>>>>>>>>>>>>>>>> decide not to expose it. I am not saying that we should >>> >>>>>>>>>>>>>>>>> force every IO to >>> >>>>>>>>>>>>>>>>> support ReadAll in its public API but if we do it is >>> >>>>>>>>>>>>>>>>> probably a good idea to be >>> >>>>>>>>>>>>>>>>> consistent with naming the transform that expects an >>> >>>>>>>>>>>>>>>>> input PCollection<Read> in >>> >>>>>>>>>>>>>>>>> the same way. Also notice that using it will save us of >>> >>>>>>>>>>>>>>>>> the maintenance issues >>> >>>>>>>>>>>>>>>>> discussed in my previous email. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Back to the main concern: the consequences of expansion >>> >>>>>>>>>>>>>>>>> based on Read: So far I >>> >>>>>>>>>>>>>>>>> have not seen consequences for the Splitting part which >>> >>>>>>>>>>>>>>>>> maps really nice >>> >>>>>>>>>>>>>>>>> assuming the Partition info / Restriction is available as >>> >>>>>>>>>>>>>>>>> part of Read. So far >>> >>>>>>>>>>>>>>>>> there are not Serialization because Beam is already >>> >>>>>>>>>>>>>>>>> enforcing this. Notice that >>> >>>>>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man >>> >>>>>>>>>>>>>>>>> SDF at least for the >>> >>>>>>>>>>>>>>>>> Bounded case (see the code in my previous email). For the >>> >>>>>>>>>>>>>>>>> other points: >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the >>> >>>>>>>>>>>>>>>>> > Kafka Read implementation allows you to set the key and >>> >>>>>>>>>>>>>>>>> > value deserializers >>> >>>>>>>>>>>>>>>>> > which are also used to dictate the output PCollection >>> >>>>>>>>>>>>>>>>> > type. It also allows you >>> >>>>>>>>>>>>>>>>> > to set how the watermark should be computed. >>> >>>>>>>>>>>>>>>>> > Technically a user may want the >>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read and >>> >>>>>>>>>>>>>>>>> > they may also want an >>> >>>>>>>>>>>>>>>>> > output type which is polymorphic (e.g. >>> >>>>>>>>>>>>>>>>> > Pcollection<Serializable>). >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Most of the times they do but for parametric types we >>> >>>>>>>>>>>>>>>>> cannot support different >>> >>>>>>>>>>>>>>>>> types in the outputs of the Read or at least I did not >>> >>>>>>>>>>>>>>>>> find how to do so (is >>> >>>>>>>>>>>>>>>>> there a way to use multiple output Coders on Beam?), we >>> >>>>>>>>>>>>>>>>> saw this in CassandraIO >>> >>>>>>>>>>>>>>>>> and we were discussing adding explicitly these Coders or >>> >>>>>>>>>>>>>>>>> Serializer >>> >>>>>>>>>>>>>>>>> specific methods to the ReadAll transform. This is less >>> >>>>>>>>>>>>>>>>> nice because it will >>> >>>>>>>>>>>>>>>>> imply some repeated methods, but it is still a compromise >>> >>>>>>>>>>>>>>>>> to gain the other >>> >>>>>>>>>>>>>>>>> advantages. I suppose the watermark case you mention is >>> >>>>>>>>>>>>>>>>> similar because you may >>> >>>>>>>>>>>>>>>>> want the watermark to behave differently in each Read and >>> >>>>>>>>>>>>>>>>> we probably don’t >>> >>>>>>>>>>>>>>>>> support this, so it corresponds to the polymorphic >>> >>>>>>>>>>>>>>>>> category. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object >>> >>>>>>>>>>>>>>>>> > modelling concerns. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> > During the implementations of >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered >>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or were >>> >>>>>>>>>>>>>>>>> > ignored if they were set? >>> >>>>>>>>>>>>>>>>> > If no, then the code deduplication is likely worth it >>> >>>>>>>>>>>>>>>>> > because we also get a >>> >>>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an >>> >>>>>>>>>>>>>>>>> > acceptable user >>> >>>>>>>>>>>>>>>>> > experience? >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> No, not so far. This is an interesting part, notice that >>> >>>>>>>>>>>>>>>>> the Read translation >>> >>>>>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part of >>> >>>>>>>>>>>>>>>>> ReadAll so the ReadFn is >>> >>>>>>>>>>>>>>>>> the real read and must be aware and use all the >>> >>>>>>>>>>>>>>>>> parameters. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> @Override >>> >>>>>>>>>>>>>>>>> public PCollection<SolrDocument> expand(PBegin input) >>> >>>>>>>>>>>>>>>>> { >>> >>>>>>>>>>>>>>>>> return input.apply("Create", >>> >>>>>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll()); >>> >>>>>>>>>>>>>>>>> } >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> I might be missing something for the Unbounded SDF case >>> >>>>>>>>>>>>>>>>> which is the only case >>> >>>>>>>>>>>>>>>>> we have not explored so far. I think one easy way to see >>> >>>>>>>>>>>>>>>>> the limitations would >>> >>>>>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try >>> >>>>>>>>>>>>>>>>> to map >>> >>>>>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> >>> >>>>>>>>>>>>>>>>> and the Read logic on >>> >>>>>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit, >>> >>>>>>>>>>>>>>>>> the polymorphic ones >>> >>>>>>>>>>>>>>>>> will be there for sure, maybe others will appear (not >>> >>>>>>>>>>>>>>>>> sure). However it would be >>> >>>>>>>>>>>>>>>>> interesting to see if we have a real gain in the >>> >>>>>>>>>>>>>>>>> maintenance points, but well >>> >>>>>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so >>> >>>>>>>>>>>>>>>>> probably the generic >>> >>>>>>>>>>>>>>>>> implementation could be relatively complex. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik >>> >>>>>>>>>>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for >>> >>>>>>>>>>>>>>>>> > cross language. The difference being that the cross >>> >>>>>>>>>>>>>>>>> > language transform would take a well known definition >>> >>>>>>>>>>>>>>>>> > and convert it to the Read transform. A normal user >>> >>>>>>>>>>>>>>>>> > would have a pipeline that would look like: >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >>> >>>>>>>>>>>>>>>>> > PCollection<Output> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) >>> >>>>>>>>>>>>>>>>> > -> PCollection<Output> >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look like: >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> PCollection<Read> -> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> >>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>> >>>>>>>>>>>>>>>>> > PCollection<Output>* >>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) >>> >>>>>>>>>>>>>>>>> > only exists since we haven't solved how to use schemas >>> >>>>>>>>>>>>>>>>> > with language bound types in a cross language way. >>> >>>>>>>>>>>>>>>>> > SchemaCoder isn't portable but RowCoder is which is why >>> >>>>>>>>>>>>>>>>> > the conversion step exists. We could have a solution >>> >>>>>>>>>>>>>>>>> > for this at some point in time. >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > My concern with using Read was around: >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read implementation >>> >>>>>>>>>>>>>>>>> > allows you to set the key and value deserializers which >>> >>>>>>>>>>>>>>>>> > are also used to dictate the output PCollection type. >>> >>>>>>>>>>>>>>>>> > It also allows you to set how the watermark should be >>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the watermark >>> >>>>>>>>>>>>>>>>> > computation to be configurable per Read and they may >>> >>>>>>>>>>>>>>>>> > also want an output type which is polymorphic (e.g. >>> >>>>>>>>>>>>>>>>> > PCollection<Serializable>). >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object >>> >>>>>>>>>>>>>>>>> > modelling concerns. >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > During the implementations of >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered that some >>> >>>>>>>>>>>>>>>>> > properties became runtime errors or were ignored if >>> >>>>>>>>>>>>>>>>> > they were set? If no, then the code deduplication is >>> >>>>>>>>>>>>>>>>> > likely worth it because we also get a lot of javadoc >>> >>>>>>>>>>>>>>>>> > deduplication, but if yes is this an acceptable user >>> >>>>>>>>>>>>>>>>> > experience? >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko >>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as >>> >>>>>>>>>>>>>>>>> >> a general "PTransform<PCollection<Read>, >>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the amount of >>> >>>>>>>>>>>>>>>>> >> code duplication and error-prone approach related to >>> >>>>>>>>>>>>>>>>> >> this. It makes much sense since usually we have all >>> >>>>>>>>>>>>>>>>> >> needed configuration set in Read objects and, as >>> >>>>>>>>>>>>>>>>> >> Ismaeil mentioned, ReadAll will consist mostly of only >>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages. So this case usually can >>> >>>>>>>>>>>>>>>>> >> be unified by using PCollection<Read> as input. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java >>> >>>>>>>>>>>>>>>>> >> IOs as cross-language transforms (as Luke described) >>> >>>>>>>>>>>>>>>>> >> which seems only partly in common with previous >>> >>>>>>>>>>>>>>>>> >> pattern of ReadAll using. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read >>> >>>>>>>>>>>>>>>>> >> configuration for all needs but seems it’s not easy >>> >>>>>>>>>>>>>>>>> >> and I’d be more in favour with Luke and Boyuan >>> >>>>>>>>>>>>>>>>> >> approach with schema. Though, maybe ReadAll is not a >>> >>>>>>>>>>>>>>>>> >> very suitable name in this case because it will can >>> >>>>>>>>>>>>>>>>> >> bring some confusions related to previous pattern of >>> >>>>>>>>>>>>>>>>> >> ReadAll uses. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) >>> >>>>>>>>>>>>>>>>> >> and (4): use the data type that is schema-aware as the >>> >>>>>>>>>>>>>>>>> >> input of ReadAll. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham! >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type >>> >>>>>>>>>>>>>>>>> >>> that is schema-aware as the input of ReadAll. >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with >>> >>>>>>>>>>>>>>>>> >>> SDF-like IO. But only having (3) is not enough to >>> >>>>>>>>>>>>>>>>> >>> solve the problem of using ReadAll in x-lang case. >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of >>> >>>>>>>>>>>>>>>>> >>> ReadAll should be able to cross language boundaries >>> >>>>>>>>>>>>>>>>> >>> and have compatibilities of updating/downgrading. >>> >>>>>>>>>>>>>>>>> >>> After investigating some possibilities(pure java pojo >>> >>>>>>>>>>>>>>>>> >>> with custom coder, protobuf, row/schema) in Kafka >>> >>>>>>>>>>>>>>>>> >>> usage, we find that row/schema fits our needs most. >>> >>>>>>>>>>>>>>>>> >>> Here comes (4). I believe that using Read as input of >>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also think >>> >>>>>>>>>>>>>>>>> >>> not all IOs have the same need. I would treat Read as >>> >>>>>>>>>>>>>>>>> >>> a special type as long as the Read is schema-aware. >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath >>> >>>>>>>>>>>>>>>>> >>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options >>> >>>>>>>>>>>>>>>>> >>>> discussed so far when it comes to defining source >>> >>>>>>>>>>>>>>>>> >>>> descriptors for ReadAll type transforms >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the >>> >>>>>>>>>>>>>>>>> >>>> input PCollection >>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data >>> >>>>>>>>>>>>>>>>> >>>> element of the input PCollection >>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read >>> >>>>>>>>>>>>>>>>> >>>> transform which essentially will convert it to a >>> >>>>>>>>>>>>>>>>> >>>> ReadAll (what Eugene mentioned) >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set >>> >>>>>>>>>>>>>>>>> >>>> of source descriptions such as files. >>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will >>> >>>>>>>>>>>>>>>>> >>>> make it hard to use the ReadAll transform as a >>> >>>>>>>>>>>>>>>>> >>>> cross-language transform and will break the >>> >>>>>>>>>>>>>>>>> >>>> separation of construction time and runtime >>> >>>>>>>>>>>>>>>>> >>>> constructs >>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful >>> >>>>>>>>>>>>>>>>> >>>> but will make the transform easier to be used as a >>> >>>>>>>>>>>>>>>>> >>>> cross-language transform without additional >>> >>>>>>>>>>>>>>>>> >>>> modifications >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like >>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we might be >>> >>>>>>>>>>>>>>>>> >>>> able to just define all sources in that format and >>> >>>>>>>>>>>>>>>>> >>>> make Read transforms just an easy to use composite >>> >>>>>>>>>>>>>>>>> >>>> built on top of that (by adding a preceding Create >>> >>>>>>>>>>>>>>>>> >>>> transform). >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> Thanks, >>> >>>>>>>>>>>>>>>>> >>>> Cham >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik >>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>> >>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be >>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically >>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform. >>> >>>>>>>>>>>>>>>>> >>>>> >>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath >>> >>>>>>>>>>>>>>>>> >>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a >>> >>>>>>>>>>>>>>>>> >>>>>> transform, at least here: >>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time >>> >>>>>>>>>>>>>>>>> >>>>>> transforms from execution time data objects that >>> >>>>>>>>>>>>>>>>> >>>>>> we store in PCollections as Luke mentioned. Also, >>> >>>>>>>>>>>>>>>>> >>>>>> we don't guarantee that PTransform is serializable >>> >>>>>>>>>>>>>>>>> >>>>>> so users have the additional complexity of >>> >>>>>>>>>>>>>>>>> >>>>>> providing a corder whenever a PTransform is used >>> >>>>>>>>>>>>>>>>> >>>>>> as a data object. >>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java >>> >>>>>>>>>>>>>>>>> >>>>>> objects that are convertible to Beam Rows allow us >>> >>>>>>>>>>>>>>>>> >>>>>> to make these transforms available to other SDKs >>> >>>>>>>>>>>>>>>>> >>>>>> through the cross-language transforms. Using >>> >>>>>>>>>>>>>>>>> >>>>>> transforms or complex sources as data objects will >>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult. >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> Thanks, >>> >>>>>>>>>>>>>>>>> >>>>>> Cham >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang >>> >>>>>>>>>>>>>>>>> >>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael, >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to >>> >>>>>>>>>>>>>>>>> >>>>>>> the IO with SDF implementation despite the type >>> >>>>>>>>>>>>>>>>> >>>>>>> of input, where Read refers to UnboundedSource. >>> >>>>>>>>>>>>>>>>> >>>>>>> One major pushback of using KafkaIO.Read as >>> >>>>>>>>>>>>>>>>> >>>>>>> source description is that not all configurations >>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate during >>> >>>>>>>>>>>>>>>>> >>>>>>> execution time. Also when thinking about x-lang >>> >>>>>>>>>>>>>>>>> >>>>>>> useage, making source description across language >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries is also necessary. As Luke mentioned, >>> >>>>>>>>>>>>>>>>> >>>>>>> it's quite easy to infer a Schema from an >>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: KafkaSourceDescription.java. >>> >>>>>>>>>>>>>>>>> >>>>>>> Then the coder of this schema-aware object will >>> >>>>>>>>>>>>>>>>> >>>>>>> be a SchemaCoder. When crossing language >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row into >>> >>>>>>>>>>>>>>>>> >>>>>>> the source description: Convert.fromRows. >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik >>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO >>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that contains >>> >>>>>>>>>>>>>>>>> >>>>>>>> the configurable parameters for reading from >>> >>>>>>>>>>>>>>>>> >>>>>>>> Kafka. This is different from the pattern that >>> >>>>>>>>>>>>>>>>> >>>>>>>> Ismael listed because they take >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<Read> as input and the Read is the >>> >>>>>>>>>>>>>>>>> >>>>>>>> same as the Read PTransform class used for the >>> >>>>>>>>>>>>>>>>> >>>>>>>> non read all case. >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to >>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to configure >>> >>>>>>>>>>>>>>>>> >>>>>>>> the transform have to be copied over to the >>> >>>>>>>>>>>>>>>>> >>>>>>>> source descriptor but decouples how a transform >>> >>>>>>>>>>>>>>>>> >>>>>>>> is specified from the object that describes what >>> >>>>>>>>>>>>>>>>> >>>>>>>> needs to be done. I believe Ismael's point is >>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling. >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I >>> >>>>>>>>>>>>>>>>> >>>>>>>> believe is a non-issue is that the Beam Java SDK >>> >>>>>>>>>>>>>>>>> >>>>>>>> has the most IO connectors and we would want to >>> >>>>>>>>>>>>>>>>> >>>>>>>> use the IO implementations within Beam Go and >>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Python. This brings in its own set of >>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and compatibility >>> >>>>>>>>>>>>>>>>> >>>>>>>> for the wire format and how one parameterizes >>> >>>>>>>>>>>>>>>>> >>>>>>>> such transforms. The wire format issue can be >>> >>>>>>>>>>>>>>>>> >>>>>>>> solved with either approach by making sure that >>> >>>>>>>>>>>>>>>>> >>>>>>>> the cross language expansion always takes the >>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and >>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into Read/KafkaSourceDescriptor/... >>> >>>>>>>>>>>>>>>>> >>>>>>>> object that is then passed to the ReadAll >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform. Boyuan has been looking to make the >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it can be >>> >>>>>>>>>>>>>>>>> >>>>>>>> represented as a row and this can be done easily >>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue integration (I don't believe >>> >>>>>>>>>>>>>>>>> >>>>>>>> there is anything preventing someone from >>> >>>>>>>>>>>>>>>>> >>>>>>>> writing a schema row -> Read -> row adapter or >>> >>>>>>>>>>>>>>>>> >>>>>>>> also using the AutoValue configuration if the >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue). >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and >>> >>>>>>>>>>>>>>>>> >>>>>>>> separation of concerns provided by using a >>> >>>>>>>>>>>>>>>>> >>>>>>>> different object to represent the contents of >>> >>>>>>>>>>>>>>>>> >>>>>>>> the PCollection from the pipeline construction >>> >>>>>>>>>>>>>>>>> >>>>>>>> time PTransform. >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov >>> >>>>>>>>>>>>>>>>> >>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael, >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered >>> >>>>>>>>>>>>>>>>> >>>>>>>>> an approach similar (or dual) to >>> >>>>>>>>>>>>>>>>> >>>>>>>>> FileIO.write(), where we in a sense also have >>> >>>>>>>>>>>>>>>>> >>>>>>>>> to configure a dynamic number different IO >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transforms of the same type (file writes)? >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many >>> >>>>>>>>>>>>>>>>> >>>>>>>>> aspects of many file writes: >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic() >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Convert the data to be written to CSVSink >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type -> new >>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames())) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>> >>>>>>>>>>>>>>>>> >>>>>>>>> "-transactions", ".csv")); >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO >>> >>>>>>>>>>>>>>>>> >>>>>>>>> reads: >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type >>> >>>>>>>>>>>>>>>>> >>>>>>>>> from which all the read parameters can be >>> >>>>>>>>>>>>>>>>> >>>>>>>>> inferred >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo>readAll() >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this >>> >>>>>>>>>>>>>>>>> >>>>>>>>> bar...) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size >>> >>>>>>>>>>>>>>>>> >>>>>>>>> for this bar...) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ...etc); >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía >>> >>>>>>>>>>>>>>>>> >>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn based ones. One pattern >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The idea is to have a different >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> doing multiple queries in the same >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> querying from multiple tables at the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms the parameters for >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of duplicated with methods and >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into the ReadAll transforms. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand the input parameters of the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into something >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> that resembles the full `Read` >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> read from multiple tables or >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this was not in the intermediate >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> methods (duplicating more code) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> full spec. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> method we end up adding them >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> so they are taken into account. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> test a new approach that is >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> code became: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> improvements on parameters of normal >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> parameters. But of course there are >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> normal Reads for example >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> between IOs (e.g. the non-SDF >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case): >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> @Override >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) { >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> return input >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn())) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey()) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn())); >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> results ReadAll you must have the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent types from the data >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> add extra withCoder method(s) on >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> So I wanted to bring this subject >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> if you see any sort of issues that >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to start using consistently the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read and the readAll() method for new >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the only remaining inconsistent >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> apart of this we should be ok). >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll and maybe it is worth to be >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik >>> >>>>>>>>>>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for >>> >>>>>>>>>>>>>>>>> > cross language. The difference being that the cross >>> >>>>>>>>>>>>>>>>> > language transform would take a well known definition >>> >>>>>>>>>>>>>>>>> > and convert it to the Read transform. A normal user >>> >>>>>>>>>>>>>>>>> > would have a pipeline that would look like: >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >>> >>>>>>>>>>>>>>>>> > PCollection<Output> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) >>> >>>>>>>>>>>>>>>>> > -> PCollection<Output> >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look like: >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> PCollection<Read> -> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> >>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>> >>>>>>>>>>>>>>>>> > PCollection<Output>* >>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) >>> >>>>>>>>>>>>>>>>> > only exists since we haven't solved how to use schemas >>> >>>>>>>>>>>>>>>>> > with language bound types in a cross language way. >>> >>>>>>>>>>>>>>>>> > SchemaCoder isn't portable but RowCoder is which is why >>> >>>>>>>>>>>>>>>>> > the conversion step exists. We could have a solution >>> >>>>>>>>>>>>>>>>> > for this at some point in time. >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > My concern with using Read was around: >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read implementation >>> >>>>>>>>>>>>>>>>> > allows you to set the key and value deserializers which >>> >>>>>>>>>>>>>>>>> > are also used to dictate the output PCollection type. >>> >>>>>>>>>>>>>>>>> > It also allows you to set how the watermark should be >>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the watermark >>> >>>>>>>>>>>>>>>>> > computation to be configurable per Read and they may >>> >>>>>>>>>>>>>>>>> > also want an output type which is polymorphic (e.g. >>> >>>>>>>>>>>>>>>>> > PCollection<Serializable>). >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object >>> >>>>>>>>>>>>>>>>> > modelling concerns. >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > During the implementations of >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered that some >>> >>>>>>>>>>>>>>>>> > properties became runtime errors or were ignored if >>> >>>>>>>>>>>>>>>>> > they were set? If no, then the code deduplication is >>> >>>>>>>>>>>>>>>>> > likely worth it because we also get a lot of javadoc >>> >>>>>>>>>>>>>>>>> > deduplication, but if yes is this an acceptable user >>> >>>>>>>>>>>>>>>>> > experience? >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko >>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as >>> >>>>>>>>>>>>>>>>> >> a general "PTransform<PCollection<Read>, >>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the amount of >>> >>>>>>>>>>>>>>>>> >> code duplication and error-prone approach related to >>> >>>>>>>>>>>>>>>>> >> this. It makes much sense since usually we have all >>> >>>>>>>>>>>>>>>>> >> needed configuration set in Read objects and, as >>> >>>>>>>>>>>>>>>>> >> Ismaeil mentioned, ReadAll will consist mostly of only >>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages. So this case usually can >>> >>>>>>>>>>>>>>>>> >> be unified by using PCollection<Read> as input. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java >>> >>>>>>>>>>>>>>>>> >> IOs as cross-language transforms (as Luke described) >>> >>>>>>>>>>>>>>>>> >> which seems only partly in common with previous >>> >>>>>>>>>>>>>>>>> >> pattern of ReadAll using. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read >>> >>>>>>>>>>>>>>>>> >> configuration for all needs but seems it’s not easy >>> >>>>>>>>>>>>>>>>> >> and I’d be more in favour with Luke and Boyuan >>> >>>>>>>>>>>>>>>>> >> approach with schema. Though, maybe ReadAll is not a >>> >>>>>>>>>>>>>>>>> >> very suitable name in this case because it will can >>> >>>>>>>>>>>>>>>>> >> bring some confusions related to previous pattern of >>> >>>>>>>>>>>>>>>>> >> ReadAll uses. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) >>> >>>>>>>>>>>>>>>>> >> and (4): use the data type that is schema-aware as the >>> >>>>>>>>>>>>>>>>> >> input of ReadAll. >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham! >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type >>> >>>>>>>>>>>>>>>>> >>> that is schema-aware as the input of ReadAll. >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with >>> >>>>>>>>>>>>>>>>> >>> SDF-like IO. But only having (3) is not enough to >>> >>>>>>>>>>>>>>>>> >>> solve the problem of using ReadAll in x-lang case. >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of >>> >>>>>>>>>>>>>>>>> >>> ReadAll should be able to cross language boundaries >>> >>>>>>>>>>>>>>>>> >>> and have compatibilities of updating/downgrading. >>> >>>>>>>>>>>>>>>>> >>> After investigating some possibilities(pure java pojo >>> >>>>>>>>>>>>>>>>> >>> with custom coder, protobuf, row/schema) in Kafka >>> >>>>>>>>>>>>>>>>> >>> usage, we find that row/schema fits our needs most. >>> >>>>>>>>>>>>>>>>> >>> Here comes (4). I believe that using Read as input of >>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also think >>> >>>>>>>>>>>>>>>>> >>> not all IOs have the same need. I would treat Read as >>> >>>>>>>>>>>>>>>>> >>> a special type as long as the Read is schema-aware. >>> >>>>>>>>>>>>>>>>> >>> >>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath >>> >>>>>>>>>>>>>>>>> >>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options >>> >>>>>>>>>>>>>>>>> >>>> discussed so far when it comes to defining source >>> >>>>>>>>>>>>>>>>> >>>> descriptors for ReadAll type transforms >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the >>> >>>>>>>>>>>>>>>>> >>>> input PCollection >>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data >>> >>>>>>>>>>>>>>>>> >>>> element of the input PCollection >>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read >>> >>>>>>>>>>>>>>>>> >>>> transform which essentially will convert it to a >>> >>>>>>>>>>>>>>>>> >>>> ReadAll (what Eugene mentioned) >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set >>> >>>>>>>>>>>>>>>>> >>>> of source descriptions such as files. >>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will >>> >>>>>>>>>>>>>>>>> >>>> make it hard to use the ReadAll transform as a >>> >>>>>>>>>>>>>>>>> >>>> cross-language transform and will break the >>> >>>>>>>>>>>>>>>>> >>>> separation of construction time and runtime >>> >>>>>>>>>>>>>>>>> >>>> constructs >>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful >>> >>>>>>>>>>>>>>>>> >>>> but will make the transform easier to be used as a >>> >>>>>>>>>>>>>>>>> >>>> cross-language transform without additional >>> >>>>>>>>>>>>>>>>> >>>> modifications >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like >>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we might be >>> >>>>>>>>>>>>>>>>> >>>> able to just define all sources in that format and >>> >>>>>>>>>>>>>>>>> >>>> make Read transforms just an easy to use composite >>> >>>>>>>>>>>>>>>>> >>>> built on top of that (by adding a preceding Create >>> >>>>>>>>>>>>>>>>> >>>> transform). >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> Thanks, >>> >>>>>>>>>>>>>>>>> >>>> Cham >>> >>>>>>>>>>>>>>>>> >>>> >>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik >>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>> >>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be >>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically >>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform. >>> >>>>>>>>>>>>>>>>> >>>>> >>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath >>> >>>>>>>>>>>>>>>>> >>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a >>> >>>>>>>>>>>>>>>>> >>>>>> transform, at least here: >>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time >>> >>>>>>>>>>>>>>>>> >>>>>> transforms from execution time data objects that >>> >>>>>>>>>>>>>>>>> >>>>>> we store in PCollections as Luke mentioned. Also, >>> >>>>>>>>>>>>>>>>> >>>>>> we don't guarantee that PTransform is serializable >>> >>>>>>>>>>>>>>>>> >>>>>> so users have the additional complexity of >>> >>>>>>>>>>>>>>>>> >>>>>> providing a corder whenever a PTransform is used >>> >>>>>>>>>>>>>>>>> >>>>>> as a data object. >>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java >>> >>>>>>>>>>>>>>>>> >>>>>> objects that are convertible to Beam Rows allow us >>> >>>>>>>>>>>>>>>>> >>>>>> to make these transforms available to other SDKs >>> >>>>>>>>>>>>>>>>> >>>>>> through the cross-language transforms. Using >>> >>>>>>>>>>>>>>>>> >>>>>> transforms or complex sources as data objects will >>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult. >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> Thanks, >>> >>>>>>>>>>>>>>>>> >>>>>> Cham >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang >>> >>>>>>>>>>>>>>>>> >>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael, >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to >>> >>>>>>>>>>>>>>>>> >>>>>>> the IO with SDF implementation despite the type >>> >>>>>>>>>>>>>>>>> >>>>>>> of input, where Read refers to UnboundedSource. >>> >>>>>>>>>>>>>>>>> >>>>>>> One major pushback of using KafkaIO.Read as >>> >>>>>>>>>>>>>>>>> >>>>>>> source description is that not all configurations >>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate during >>> >>>>>>>>>>>>>>>>> >>>>>>> execution time. Also when thinking about x-lang >>> >>>>>>>>>>>>>>>>> >>>>>>> useage, making source description across language >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries is also necessary. As Luke mentioned, >>> >>>>>>>>>>>>>>>>> >>>>>>> it's quite easy to infer a Schema from an >>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: KafkaSourceDescription.java. >>> >>>>>>>>>>>>>>>>> >>>>>>> Then the coder of this schema-aware object will >>> >>>>>>>>>>>>>>>>> >>>>>>> be a SchemaCoder. When crossing language >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row into >>> >>>>>>>>>>>>>>>>> >>>>>>> the source description: Convert.fromRows. >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik >>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO >>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that contains >>> >>>>>>>>>>>>>>>>> >>>>>>>> the configurable parameters for reading from >>> >>>>>>>>>>>>>>>>> >>>>>>>> Kafka. This is different from the pattern that >>> >>>>>>>>>>>>>>>>> >>>>>>>> Ismael listed because they take >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<Read> as input and the Read is the >>> >>>>>>>>>>>>>>>>> >>>>>>>> same as the Read PTransform class used for the >>> >>>>>>>>>>>>>>>>> >>>>>>>> non read all case. >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to >>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to configure >>> >>>>>>>>>>>>>>>>> >>>>>>>> the transform have to be copied over to the >>> >>>>>>>>>>>>>>>>> >>>>>>>> source descriptor but decouples how a transform >>> >>>>>>>>>>>>>>>>> >>>>>>>> is specified from the object that describes what >>> >>>>>>>>>>>>>>>>> >>>>>>>> needs to be done. I believe Ismael's point is >>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling. >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I >>> >>>>>>>>>>>>>>>>> >>>>>>>> believe is a non-issue is that the Beam Java SDK >>> >>>>>>>>>>>>>>>>> >>>>>>>> has the most IO connectors and we would want to >>> >>>>>>>>>>>>>>>>> >>>>>>>> use the IO implementations within Beam Go and >>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Python. This brings in its own set of >>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and compatibility >>> >>>>>>>>>>>>>>>>> >>>>>>>> for the wire format and how one parameterizes >>> >>>>>>>>>>>>>>>>> >>>>>>>> such transforms. The wire format issue can be >>> >>>>>>>>>>>>>>>>> >>>>>>>> solved with either approach by making sure that >>> >>>>>>>>>>>>>>>>> >>>>>>>> the cross language expansion always takes the >>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and >>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into Read/KafkaSourceDescriptor/... >>> >>>>>>>>>>>>>>>>> >>>>>>>> object that is then passed to the ReadAll >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform. Boyuan has been looking to make the >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it can be >>> >>>>>>>>>>>>>>>>> >>>>>>>> represented as a row and this can be done easily >>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue integration (I don't believe >>> >>>>>>>>>>>>>>>>> >>>>>>>> there is anything preventing someone from >>> >>>>>>>>>>>>>>>>> >>>>>>>> writing a schema row -> Read -> row adapter or >>> >>>>>>>>>>>>>>>>> >>>>>>>> also using the AutoValue configuration if the >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue). >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and >>> >>>>>>>>>>>>>>>>> >>>>>>>> separation of concerns provided by using a >>> >>>>>>>>>>>>>>>>> >>>>>>>> different object to represent the contents of >>> >>>>>>>>>>>>>>>>> >>>>>>>> the PCollection from the pipeline construction >>> >>>>>>>>>>>>>>>>> >>>>>>>> time PTransform. >>> >>>>>>>>>>>>>>>>> >>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov >>> >>>>>>>>>>>>>>>>> >>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael, >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered >>> >>>>>>>>>>>>>>>>> >>>>>>>>> an approach similar (or dual) to >>> >>>>>>>>>>>>>>>>> >>>>>>>>> FileIO.write(), where we in a sense also have >>> >>>>>>>>>>>>>>>>> >>>>>>>>> to configure a dynamic number different IO >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transforms of the same type (file writes)? >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many >>> >>>>>>>>>>>>>>>>> >>>>>>>>> aspects of many file writes: >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic() >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Convert the data to be written to CSVSink >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type -> new >>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames())) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>> >>>>>>>>>>>>>>>>> >>>>>>>>> "-transactions", ".csv")); >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO >>> >>>>>>>>>>>>>>>>> >>>>>>>>> reads: >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type >>> >>>>>>>>>>>>>>>>> >>>>>>>>> from which all the read parameters can be >>> >>>>>>>>>>>>>>>>> >>>>>>>>> inferred >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo>readAll() >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this >>> >>>>>>>>>>>>>>>>> >>>>>>>>> bar...) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size >>> >>>>>>>>>>>>>>>>> >>>>>>>>> for this bar...) >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ...etc); >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía >>> >>>>>>>>>>>>>>>>> >>>>>>>>> <[email protected]> wrote: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn based ones. One pattern >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The idea is to have a different >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> doing multiple queries in the same >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> querying from multiple tables at the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms the parameters for >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of duplicated with methods and >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into the ReadAll transforms. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand the input parameters of the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into something >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> that resembles the full `Read` >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> read from multiple tables or >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this was not in the intermediate >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> methods (duplicating more code) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> full spec. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> method we end up adding them >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> so they are taken into account. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> test a new approach that is >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> code became: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> improvements on parameters of normal >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> parameters. But of course there are >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats: >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> normal Reads for example >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> between IOs (e.g. the non-SDF >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case): >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> @Override >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) { >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> return input >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn())) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey()) >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn())); >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> results ReadAll you must have the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent types from the data >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> add extra withCoder method(s) on >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> So I wanted to bring this subject >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> if you see any sort of issues that >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to start using consistently the >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read and the readAll() method for new >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the only remaining inconsistent >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> apart of this we should be ok). >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll and maybe it is worth to be >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards, >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>> >> >>> >>>>>>>>>>>>>>>>>
