Vincent, I will be out in the sense that I cannot really engage myself into more activities because I have apart of your review two more pending + other work to finish so I prefer not to add more work I cannot finish. I am still available for the review however so let’s get this finally finished there. Thanks for pointing this because it made me realize that I missed one important part in my last message that was oriented towards a better solution but ignoring the ongoing work.
Even if it has limitations it seems that the ReadAll based on PCollection<Read> approach has clear benefits and Vincent’s use case is one more instance, so I would like to propose that for the in-progress PRs we keep it as proposed until we find a better solution. Notice however that in every case ReadAll is a new feature and should be marked still as @Experimental so we can still improve it / change course. On Wed, Jul 15, 2020 at 1:02 AM Vincent Marquez <[email protected]> wrote: > > Hi everyone, i've been working on > https://issues.apache.org/jira/browse/BEAM-9008 for quite some time now, > trying to contribute back the much improved Cassandra connector. I > originally had been passing around a 'config' object to the readAll, but > after much discussion with Ismaël we decided it was best if I refactor to the > readAll taking a Read<A> as a parameter to be more uniform with some of the > other Connectors. > > I don't see a use case for using Read as a key itself, especially for the > CassandraIO's Read given it itself contains functions (Equality and functions > just seem like a weird thing to even try to deal with). > > For Ismaël and others, I would like to get this merged in sometime soon, as I > believe it has all of the requested functionality. If Ismaël is leaving for > a month, is there someone else who could help me with this? > > > ~Vincent > > > On Tue, Jul 14, 2020 at 2:56 PM Ismaël Mejía <[email protected]> wrote: >> >> It has been really interesting to read all the perspectives in this thread >> and I >> have even switched sides back and forth given the advantages / issues exposed >> here, so it means we have clear pros/cons. >> >> One ‘not so nice‘ discovery related to this discussion for me was BEAM-10375 >> [1] >> tldr; Reads use java serialization so they don’t have a default deterministic >> coder and if they are used as keys they break on GbK because Java’s >> implementation requires keys to be deterministic [2] (is this the case in all >> the other languages?). We can workaround this by having an alternative Coder >> for >> Reads but somehow it does not feel natural and adds extra maintenance. >> >> I really like Kenn’s idea that we should rethink from scratch or write a >> proposal of how we can have designed this with the present awareness about >> DoFn >> based composition, code reuse and schema friendliness. Maybe worth to >> enumerate >> what are the essentials we want to have (or not) first. I will be OOO for the >> next month so I cannot actively work on this, but I will be interested on >> reviewing/contributing in case someone wants to take the lead on a better >> solution or we can in the meantime keep bringing ideas to this thread. >> >> Configuration based on functions translates hardly across languages so I >> wonder >> if we should have also a mechanism to map those. Notice that an important use >> case of this is the detailed configuration of clients for IOs which we have >> started to expose in some IOs to avoid filling IOs API with ‘knobs‘ and >> better >> let the user do their tuning by providing a client via a function. >> >> [1] https://issues.apache.org/jira/browse/BEAM-10375 >> [2] >> https://github.com/apache/beam/blob/a9d70fed9069c4f4e9a12860ef711652f5f9c21a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java#L232-L237 >> >> On Thu, Jul 9, 2020 at 5:52 AM Kenneth Knowles <[email protected]> wrote: >> > >> > If we are forced to create a fresh class due to a breaking change, let's >> > migrate to the "what we would do from scratch" approach, please. >> > >> > Kenn >> > >> > On Wed, Jul 8, 2020 at 5:15 PM Robert Bradshaw <[email protected]> wrote: >> >> >> >> OK, I'm +0 on this change. Using the PTransform as an element is >> >> probably better than duplicating the full API on another interface, >> >> and think it's worth getting this ublocked. This will require a Read2 >> >> if we have to add options in a upgrade-compatible way. >> >> >> >> On Tue, Jul 7, 2020 at 3:19 PM Luke Cwik <[email protected]> wrote: >> >> > >> >> > Robert, you're correct in your understanding that the Read PTransform >> >> > would be encoded via the schema coder. >> >> > >> >> > Kenn, different serializers are ok as long as the output coder can >> >> > encode/decode the output type. Different watermark fns are also ok >> >> > since it is about computing the watermark for each individual source >> >> > and won't impact the watermark computed by other sources. Watermark >> >> > advancement will still be held back by the source that is furthest >> >> > behind and still has the same problems when a user chooses a watermark >> >> > fn that was incompatible with the windowing strategy for producing >> >> > output (e.g. global window + default trigger + streaming pipeline). >> >> > >> >> > Both are pretty close so if we started from scratch then it could go >> >> > either way but we aren't starting from scratch (I don't think a Beam >> >> > 3.0 is likely to happen in the next few years as there isn't enough >> >> > stuff that we want to remove vs the amount of stuff we would gain). >> >> > >> >> > On Tue, Jul 7, 2020 at 2:57 PM Kenneth Knowles <[email protected]> wrote: >> >> >> >> >> >> On Tue, Jul 7, 2020 at 2:24 PM Robert Bradshaw <[email protected]> >> >> >> wrote: >> >> >>> >> >> >>> On Tue, Jul 7, 2020 at 2:06 PM Luke Cwik <[email protected]> wrote: >> >> >>> > >> >> >>> > Robert, the intent is that the Read object would use a schema coder >> >> >>> > and for XLang purposes would be no different then a POJO. >> >> >>> >> >> >>> Just to clarify, you're saying that the Read PTransform would be >> >> >>> encoded via the schema coder? That still feels a bit odd (and >> >> >>> specificically if we were designing IO from scratch rather than >> >> >>> adapting to what already exists would we choose to use PTransforms as >> >> >>> elements?) but would solve the cross language issue. >> >> >> >> >> >> >> >> >> I like this question. If we were designing from scratch, what would we >> >> >> do? Would we encourage users to feed Create.of(SourceDescriptor) into >> >> >> ReadAll? We would probably provide a friendly wrapper for reading one >> >> >> static thing, and call it Read. But it would probably have an API like >> >> >> Read.from(SourceDescriptor), thus eliminating duplicate documentation >> >> >> and boilerplate that Luke described while keeping the separation that >> >> >> Brian described and clarity around xlang environments. But I'm +0 on >> >> >> whatever has momentum. I think the main downside is the weirdness >> >> >> around serializers/watermarkFn/etc on Read. I am not sure how much >> >> >> this will cause users problems. It would be very ambitious of them to >> >> >> produce a PCollection<Read> where they had different fns per element... >> >> >> >> >> >> Kenn >> >> >> >> >> >>> >> >> >>> > The issue of how to deal with closures applies to both equally and >> >> >>> > that is why I suggested to favor using data over closures. Once >> >> >>> > there is an implementation for how to deal with UDFs in an XLang >> >> >>> > world, this guidance can change. >> >> >>> > >> >> >>> > Kenn, I did mean specifying an enum that the XLang expansion >> >> >>> > service would return a serialized blob of code. The XLang expansion >> >> >>> > service is responsible for returning an environment that contains >> >> >>> > all the necessary dependencies to execute the transforms and the >> >> >>> > serialized blob of code and hence would be a non-issue for the >> >> >>> > caller. >> >> >>> > >> >> >>> > From reviewing the SDF Kafka PR, the reduction in maintenance is >> >> >>> > definitely there (100s of lines of duplicated boilerplate and >> >> >>> > documentation). >> >> >>> > >> >> >>> > What are the next steps to get a resolution on this? >> >> >>> > >> >> >>> > On Thu, Jul 2, 2020 at 10:38 AM Robert Bradshaw >> >> >>> > <[email protected]> wrote: >> >> >>> >> >> >> >>> >> On Thu, Jul 2, 2020 at 10:26 AM Kenneth Knowles <[email protected]> >> >> >>> >> wrote: >> >> >>> >>> >> >> >>> >>> >> >> >>> >>> On Wed, Jul 1, 2020 at 4:17 PM Eugene Kirpichov >> >> >>> >>> <[email protected]> wrote: >> >> >>> >>>> >> >> >>> >>>> Kenn - I don't mean an enum of common closures, I mean >> >> >>> >>>> expressing closures in a restricted sub-language such as the >> >> >>> >>>> language of SQL expressions. >> >> >>> >>> >> >> >>> >>> >> >> >>> >>> My lack of clarity: enums was my phrasing of Luke's item 1). I >> >> >>> >>> understood what you meant. I think either a set of well-known >> >> >>> >>> closures or a tiny sublanguage could add value. >> >> >>> >>> >> >> >>> >>>> >> >> >>> >>>> That would only work if there is a portable way to interpret SQL >> >> >>> >>>> expressions, but if there isn't, maybe there should be - for the >> >> >>> >>>> sake of, well, expressing closures portably. Of course these >> >> >>> >>>> would be closures that only work with rows - but that seems >> >> >>> >>>> powerful enough for many if not most purposes. >> >> >>> >>> >> >> >>> >>> >> >> >>> >>> You can choose a SQL dialect or choose the tiniest subset just >> >> >>> >>> for this purpose and go with it. But when the data type going in >> >> >>> >>> or out of the lambda are e.g. some Java or Python object then >> >> >>> >>> what? One idea is to always require these to be rows. But if you >> >> >>> >>> can really get away with a dependency-free context-free lambda, >> >> >>> >>> then Javascript or Python is as doable as SQL in terms of having >> >> >>> >>> a tiny restricted language for just this purpose. I would expect >> >> >>> >>> once it got used, folks would start to ask to include the rest of >> >> >>> >>> what the language has to offer - its ecosystem. This is always >> >> >>> >>> the main design point I am interested in for "lightweight" >> >> >>> >>> embedded UDF proposals. >> >> >>> >> >> >> >>> >> >> >> >>> >> This is getting off the topic of ReadAll, but I think being able >> >> >>> >> to do arbitrary computation in preceding/succeeding transform plus >> >> >>> >> a (quite) restricted language in the transform itself can go a >> >> >>> >> long way. (For example, one could have a dynamic destinations >> >> >>> >> write that takes a KV<element, dest> where dest is a format string >> >> >>> >> like "foo-{{shard_num}}.txt" to plug in the truly dynamic pieces, >> >> >>> >> but the dest string itself can be computed (e.g. based on the >> >> >>> >> element) using arbitrary code in the caller language.) >> >> >>> >> >> >> >>> >>> >> >> >>> >>> >> >> >>> >>> Kenn >> >> >>> >>> >> >> >>> >>>> >> >> >>> >>>> For example, maybe the Java example: >> >> >>> >>>> >> >> >>> >>>> PCollection<BankTransaction> transactions = ...; >> >> >>> >>>> transactions.apply(FileIO.<TransactionType, >> >> >>> >>>> Transaction>writeDynamic() >> >> >>> >>>> .by(Transaction::getType) >> >> >>> >>>> .via(tx -> tx.getType().toFields(tx), // Convert the data >> >> >>> >>>> to be written to CSVSink >> >> >>> >>>> type -> new CSVSink(type.getFieldNames())) >> >> >>> >>>> .to(".../path/to/") >> >> >>> >>>> .withNaming(type -> defaultNaming(type + "-transactions", >> >> >>> >>>> ".csv")); >> >> >>> >>>> >> >> >>> >>>> could be written in Python as: >> >> >>> >>>> >> >> >>> >>>> transactions | fileio.write_dynamic( >> >> >>> >>>> by="it.type", # "it" is implicitly available in these SQL >> >> >>> >>>> expressions as the same thing as the Java lambda argument >> >> >>> >>>> format="it.fields", >> >> >>> >>>> sink="CSV_SINK(it.type.field_names)", # A bunch of preset >> >> >>> >>>> sinks supported in every language? >> >> >>> >>>> to=".../path/to/", >> >> >>> >>>> naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')") >> >> >>> >>>> >> >> >>> >>>> Again, to be clear, I'm not suggesting to block what Ismael is >> >> >>> >>>> proposing on getting this done - getting this done wouldn't be a >> >> >>> >>>> short term effort, but seems potentially really nice. >> >> >>> >>>> >> >> >>> >>>> >> >> >>> >>>> On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <[email protected]> >> >> >>> >>>> wrote: >> >> >>> >>>>> >> >> >>> >>>>> From the Go side of the table, the Go language doesn't provide >> >> >>> >>>>> a mechanism to serialize or access closure data, which means >> >> >>> >>>>> DoFns can't be functional closures.This combined with the move >> >> >>> >>>>> to have the "Structural DoFns" be serialized using Beam >> >> >>> >>>>> Schemas, has the net result that if Go transforms are used for >> >> >>> >>>>> Cross Language, they will be configurable with a Schema of the >> >> >>> >>>>> configuration data. >> >> >>> >>>>> >> >> >>> >>>>> Of course, this just means that each language will probably >> >> >>> >>>>> provide whichever mechanisms it likes for use of it's cross >> >> >>> >>>>> language transforms. >> >> >>> >>>>> >> >> >>> >>>>> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <[email protected]> >> >> >>> >>>>> wrote: >> >> >>> >>>>>> >> >> >>> >>>>>> I don't think an enum of most common closures will work. The >> >> >>> >>>>>> input types are typically generics that are made concrete by >> >> >>> >>>>>> the caller who also provides the closures. I think Luke's (2) >> >> >>> >>>>>> is the same idea as my "Java still assembles it [using opaque >> >> >>> >>>>>> Python closures/transforms]". It seems like an approach to >> >> >>> >>>>>> (3). Passing over actual code could address some cases, but >> >> >>> >>>>>> libraries become the issue. >> >> >>> >>>>>> >> >> >>> >>>>>> I think it is fair to say that "WriteAll" style would involve >> >> >>> >>>>>> entering unexplored territory. >> >> >>> >>>>>> >> >> >>> >>>>>> On the main topic, I think Brian has a pretty strong point and >> >> >>> >>>>>> his example of type conversion lambdas is a good example. I >> >> >>> >>>>>> did a quick survey and every other property I could find does >> >> >>> >>>>>> seem like it fits on the Read, and most IOs have a few of >> >> >>> >>>>>> these closures for example also extracting timestamps. So >> >> >>> >>>>>> maybe just a resolution convention of putting them on the >> >> >>> >>>>>> ReadAll and that taking precedence. Then you would be >> >> >>> >>>>>> deserializing a Read transform with insta-crash methods or >> >> >>> >>>>>> some such? >> >> >>> >>>>>> >> >> >>> >>>>>> Kenn >> >> >>> >>>>>> >> >> >>> >>>>>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov >> >> >>> >>>>>> <[email protected]> wrote: >> >> >>> >>>>>>> >> >> >>> >>>>>>> Yeah, mainly I just feel like dynamic reads and dynamic >> >> >>> >>>>>>> writes (and perhaps not-yet-invented similar transforms of >> >> >>> >>>>>>> other kinds) are tightly related - they are either very >> >> >>> >>>>>>> similar, or are duals of each other - so they should use the >> >> >>> >>>>>>> same approach. If they are using different approaches, it is >> >> >>> >>>>>>> a sign that either one of them is being done wrong or that we >> >> >>> >>>>>>> are running into a fundamental limitation of Beam (e.g. >> >> >>> >>>>>>> difficulty of encoding closures compared to encoding >> >> >>> >>>>>>> elements). >> >> >>> >>>>>>> >> >> >>> >>>>>>> But I agree with Luke that we shouldn't give up on closures. >> >> >>> >>>>>>> Especially with the work that has been done on schemas and >> >> >>> >>>>>>> SQL, I see no reason why we couldn't express closures in a >> >> >>> >>>>>>> portable restricted sub-language. If we can express SQL, we >> >> >>> >>>>>>> can express many or most use cases of dynamic reads/writes - >> >> >>> >>>>>>> I don't mean that we should actually use SQL (though we could >> >> >>> >>>>>>> - e.g. SQL scalar expressions seem powerful enough to express >> >> >>> >>>>>>> the closures appearing in most use cases of >> >> >>> >>>>>>> FileIO.writeDynamic), I just mean that SQL is an existence >> >> >>> >>>>>>> proof. >> >> >>> >>>>>>> >> >> >>> >>>>>>> (I don't want to rock the boat too much, just thought I'd >> >> >>> >>>>>>> chime in as this topic is dear to my heart) >> >> >>> >>>>>>> >> >> >>> >>>>>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> >> >> >>> >>>>>>> wrote: >> >> >>> >>>>>>>> >> >> >>> >>>>>>>> Kenn, I'm not too worried about closures since: >> >> >>> >>>>>>>> 1) the expansion service for a transform could have a well >> >> >>> >>>>>>>> set of defined closures by name that are returned as >> >> >>> >>>>>>>> serialized objects that don't need to be interpretable by >> >> >>> >>>>>>>> the caller >> >> >>> >>>>>>>> 2) the language could store serialized functions of another >> >> >>> >>>>>>>> language as constants >> >> >>> >>>>>>>> 3) generic XLang function support will eventually be needed >> >> >>> >>>>>>>> but I do agree that closures do make things difficult to >> >> >>> >>>>>>>> express vs data which is why primarily why we should prefer >> >> >>> >>>>>>>> data over closures when possible and use closures when >> >> >>> >>>>>>>> expressing it with data would be too cumbersome. >> >> >>> >>>>>>>> >> >> >>> >>>>>>>> Brian, so far the cases that have been migrated have shown >> >> >>> >>>>>>>> that the source descriptor and the Read transform are almost >> >> >>> >>>>>>>> the same (some parameters that only impact pipeline >> >> >>> >>>>>>>> construction such as coders differ). >> >> >>> >>>>>>>> >> >> >>> >>>>>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette >> >> >>> >>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>> >> >> >>> >>>>>>>>> Sorry for jumping into this late and casting a vote against >> >> >>> >>>>>>>>> the consensus... but I think I'd prefer standardizing on a >> >> >>> >>>>>>>>> pattern like PCollection<KafkaSourceDescriptor> rather than >> >> >>> >>>>>>>>> PCollection<Read>. That approach clearly separates the >> >> >>> >>>>>>>>> parameters that are allowed to vary across a ReadAll (the >> >> >>> >>>>>>>>> ones defined in KafkaSourceDescriptor) from the parameters >> >> >>> >>>>>>>>> that should be constant (other parameters in the Read >> >> >>> >>>>>>>>> object, like SerializedFunctions for type conversions, >> >> >>> >>>>>>>>> parameters for different operating modes, etc...). I think >> >> >>> >>>>>>>>> it's helpful to think of the parameters that are allowed to >> >> >>> >>>>>>>>> vary as some "location descriptor", but I imagine IO >> >> >>> >>>>>>>>> authors may want other parameters to vary across a ReadAll >> >> >>> >>>>>>>>> as well. >> >> >>> >>>>>>>>> >> >> >>> >>>>>>>>> To me it seems safer to let an IO author "opt-in" to a >> >> >>> >>>>>>>>> parameter being dynamic at execution time. >> >> >>> >>>>>>>>> >> >> >>> >>>>>>>>> Brian >> >> >>> >>>>>>>>> >> >> >>> >>>>>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov >> >> >>> >>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>> >> >> >>> >>>>>>>>>> I'd like to raise one more time the question of >> >> >>> >>>>>>>>>> consistency between dynamic reads and dynamic writes, per >> >> >>> >>>>>>>>>> my email at the beginning of the thread. >> >> >>> >>>>>>>>>> If the community prefers ReadAll to read from Read, then >> >> >>> >>>>>>>>>> should dynamicWrite's write to Write? >> >> >>> >>>>>>>>>> >> >> >>> >>>>>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang >> >> >>> >>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>> >> >> >>> >>>>>>>>>>> It seems like most of us agree on the idea that ReadAll >> >> >>> >>>>>>>>>>> should read from Read. I'm going to update the Kafka >> >> >>> >>>>>>>>>>> ReadAll with the same pattern. >> >> >>> >>>>>>>>>>> Thanks for all your help! >> >> >>> >>>>>>>>>>> >> >> >>> >>>>>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath >> >> >>> >>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>> >> >> >>> >>>>>>>>>>>> >> >> >>> >>>>>>>>>>>> >> >> >>> >>>>>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik >> >> >>> >>>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>> I would also like to suggest that transforms that >> >> >>> >>>>>>>>>>>>> implement ReadAll via Read should also provide methods >> >> >>> >>>>>>>>>>>>> like: >> >> >>> >>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>> // Uses the specified values if unspecified in the >> >> >>> >>>>>>>>>>>>> input element from the PCollection<Read>. >> >> >>> >>>>>>>>>>>>> withDefaults(Read read); >> >> >>> >>>>>>>>>>>>> // Uses the specified values regardless of what the >> >> >>> >>>>>>>>>>>>> input element from the PCollection<Read> specifies. >> >> >>> >>>>>>>>>>>>> withOverrides(Read read); >> >> >>> >>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>> and only adds methods that are required at construction >> >> >>> >>>>>>>>>>>>> time (e.g. coders). This way the majority of >> >> >>> >>>>>>>>>>>>> documentation sits on the Read transform. >> >> >>> >>>>>>>>>>>> >> >> >>> >>>>>>>>>>>> >> >> >>> >>>>>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks >> >> >>> >>>>>>>>>>>> here and some of the drawbacks related to cross-language >> >> >>> >>>>>>>>>>>> can be overcome through future advancements. >> >> >>> >>>>>>>>>>>> Thanks for bringing this up Ismaël. >> >> >>> >>>>>>>>>>>> >> >> >>> >>>>>>>>>>>> - Cham >> >> >>> >>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik >> >> >>> >>>>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>> Ismael, it is good to hear that using Read as the >> >> >>> >>>>>>>>>>>>>> input didn't have a bunch of parameters that were >> >> >>> >>>>>>>>>>>>>> being skipped/ignored. Also, for the polymorphism >> >> >>> >>>>>>>>>>>>>> issue you have to rely on the user correctly telling >> >> >>> >>>>>>>>>>>>>> you the type in such a way where it is a common >> >> >>> >>>>>>>>>>>>>> ancestor of all the runtime types that will ever be >> >> >>> >>>>>>>>>>>>>> used. This usually boils down to something like >> >> >>> >>>>>>>>>>>>>> Serializable or DynamicMessage such that the coder >> >> >>> >>>>>>>>>>>>>> that is chosen works for all the runtime types. Using >> >> >>> >>>>>>>>>>>>>> multiple types is a valid use case and would allow for >> >> >>> >>>>>>>>>>>>>> a simpler graph with less flattens merging the output >> >> >>> >>>>>>>>>>>>>> from multiple sources. >> >> >>> >>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for >> >> >>> >>>>>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the >> >> >>> >>>>>>>>>>>>>> parameters can't be represented in a meaningful way >> >> >>> >>>>>>>>>>>>>> beyond "bytes". This would be helpful for cross >> >> >>> >>>>>>>>>>>>>> language as well since every parameter would become >> >> >>> >>>>>>>>>>>>>> available if a language could support it (e.g. it >> >> >>> >>>>>>>>>>>>>> could serialize a java function up front and keep it >> >> >>> >>>>>>>>>>>>>> saved as raw bytes within said language). Even if we >> >> >>> >>>>>>>>>>>>>> figure out a better way to do this in the future, >> >> >>> >>>>>>>>>>>>>> we'll have to change the schema for the new way >> >> >>> >>>>>>>>>>>>>> anyway. This would mean that the external version of >> >> >>> >>>>>>>>>>>>>> the transform adopts Row to Read and we drop >> >> >>> >>>>>>>>>>>>>> KafkaSourceDescriptor. The conversion from Row to Read >> >> >>> >>>>>>>>>>>>>> could validate that the parameters make sense (e.g. >> >> >>> >>>>>>>>>>>>>> the bytes are valid serialized functions). The >> >> >>> >>>>>>>>>>>>>> addition of an endReadTime/endReadOffset would make >> >> >>> >>>>>>>>>>>>>> sense for KafkaIO.Read as well and this would enable >> >> >>> >>>>>>>>>>>>>> having a bounded version that could be used for >> >> >>> >>>>>>>>>>>>>> backfills (this doesn't have to be done as part of any >> >> >>> >>>>>>>>>>>>>> current ongoing PR). Essentially any parameter that >> >> >>> >>>>>>>>>>>>>> could be added for a single instance of a Kafka >> >> >>> >>>>>>>>>>>>>> element+restriction would also make sense to the >> >> >>> >>>>>>>>>>>>>> KafkaIO.Read transform since it too is a single >> >> >>> >>>>>>>>>>>>>> instance. There are parameters that would apply to the >> >> >>> >>>>>>>>>>>>>> ReadAll that wouldn't apply to a read and these would >> >> >>> >>>>>>>>>>>>>> be global parameters across all element+restriction >> >> >>> >>>>>>>>>>>>>> pairs such as config overrides or default values. >> >> >>> >>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>> I am convinced that we should do as Ismael is >> >> >>> >>>>>>>>>>>>>> suggesting and use KafkaIO.Read as the type. >> >> >>> >>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath >> >> >>> >>>>>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>> Discussion regarding cross-language transforms is a >> >> >>> >>>>>>>>>>>>>>> slight tangent here. But I think, in general, it's >> >> >>> >>>>>>>>>>>>>>> great if we can use existing transforms (for example, >> >> >>> >>>>>>>>>>>>>>> IO connectors) as cross-language transforms without >> >> >>> >>>>>>>>>>>>>>> having to build more composites (irrespective of >> >> >>> >>>>>>>>>>>>>>> whether in ExternalTransformBuilders or a user >> >> >>> >>>>>>>>>>>>>>> pipelines) just to make them cross-language >> >> >>> >>>>>>>>>>>>>>> compatible. A future cross-language compatible >> >> >>> >>>>>>>>>>>>>>> SchemaCoder might help (assuming that works for Read >> >> >>> >>>>>>>>>>>>>>> transform) but I'm not sure we have a good idea when >> >> >>> >>>>>>>>>>>>>>> we'll get to that state. >> >> >>> >>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>> Thanks, >> >> >>> >>>>>>>>>>>>>>> Cham >> >> >>> >>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang >> >> >>> >>>>>>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the >> >> >>> >>>>>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline >> >> >>> >>>>>>>>>>>>>>>> update scenario(For detailed discussion, please >> >> >>> >>>>>>>>>>>>>>>> refer to >> >> >>> >>>>>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). >> >> >>> >>>>>>>>>>>>>>>> In order to obtain the compatibility, it requires >> >> >>> >>>>>>>>>>>>>>>> the input of the read SDF is schema-aware. >> >> >>> >>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>> Thus the major constraint of mapping >> >> >>> >>>>>>>>>>>>>>>> KafkaSourceDescriptor to PCollection<Read> is, the >> >> >>> >>>>>>>>>>>>>>>> KafkaIO.Read also needs to be schema-aware, >> >> >>> >>>>>>>>>>>>>>>> otherwise pipeline updates might fail unnecessarily. >> >> >>> >>>>>>>>>>>>>>>> If looking into KafkaIO.Read, not all necessary >> >> >>> >>>>>>>>>>>>>>>> fields are compatible with schema, for example, >> >> >>> >>>>>>>>>>>>>>>> SerializedFunction. >> >> >>> >>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> >> >> >>> >>>>>>>>>>>>>>>> is a common pattern for SDF based IO. The Read can >> >> >>> >>>>>>>>>>>>>>>> be a common pattern because the input is always a >> >> >>> >>>>>>>>>>>>>>>> PBegin. But for an SDF based IO, the input can be >> >> >>> >>>>>>>>>>>>>>>> anything. By using Read as input, we will still have >> >> >>> >>>>>>>>>>>>>>>> the maintenance cost when SDF IO supports a new >> >> >>> >>>>>>>>>>>>>>>> field but Read doesn't consume it. For example, we >> >> >>> >>>>>>>>>>>>>>>> are discussing adding endOffset and endReadTime to >> >> >>> >>>>>>>>>>>>>>>> KafkaSourceDescriptior, which is not used in >> >> >>> >>>>>>>>>>>>>>>> KafkaIO.Read. >> >> >>> >>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía >> >> >>> >>>>>>>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> We forgot to mention (5) External.Config used in >> >> >>> >>>>>>>>>>>>>>>>> cross-lang, see KafkaIO >> >> >>> >>>>>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the >> >> >>> >>>>>>>>>>>>>>>>> predecessor of (4) and probably a >> >> >>> >>>>>>>>>>>>>>>>> really good candidate to be replaced by the Row >> >> >>> >>>>>>>>>>>>>>>>> based Configuration Boyuan is >> >> >>> >>>>>>>>>>>>>>>>> envisioning (so good to be aware of this). >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention >> >> >>> >>>>>>>>>>>>>>>>> the real issue(s). All the >> >> >>> >>>>>>>>>>>>>>>>> approaches discussed so far in the end could be >> >> >>> >>>>>>>>>>>>>>>>> easily transformed to produce a >> >> >>> >>>>>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be >> >> >>> >>>>>>>>>>>>>>>>> read by the generic ReadAll >> >> >>> >>>>>>>>>>>>>>>>> transform. Notice that this can be internal in some >> >> >>> >>>>>>>>>>>>>>>>> IOs e.g. KafkaIO if they >> >> >>> >>>>>>>>>>>>>>>>> decide not to expose it. I am not saying that we >> >> >>> >>>>>>>>>>>>>>>>> should force every IO to >> >> >>> >>>>>>>>>>>>>>>>> support ReadAll in its public API but if we do it >> >> >>> >>>>>>>>>>>>>>>>> is probably a good idea to be >> >> >>> >>>>>>>>>>>>>>>>> consistent with naming the transform that expects >> >> >>> >>>>>>>>>>>>>>>>> an input PCollection<Read> in >> >> >>> >>>>>>>>>>>>>>>>> the same way. Also notice that using it will save >> >> >>> >>>>>>>>>>>>>>>>> us of the maintenance issues >> >> >>> >>>>>>>>>>>>>>>>> discussed in my previous email. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> Back to the main concern: the consequences of >> >> >>> >>>>>>>>>>>>>>>>> expansion based on Read: So far I >> >> >>> >>>>>>>>>>>>>>>>> have not seen consequences for the Splitting part >> >> >>> >>>>>>>>>>>>>>>>> which maps really nice >> >> >>> >>>>>>>>>>>>>>>>> assuming the Partition info / Restriction is >> >> >>> >>>>>>>>>>>>>>>>> available as part of Read. So far >> >> >>> >>>>>>>>>>>>>>>>> there are not Serialization because Beam is already >> >> >>> >>>>>>>>>>>>>>>>> enforcing this. Notice that >> >> >>> >>>>>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor >> >> >>> >>>>>>>>>>>>>>>>> man SDF at least for the >> >> >>> >>>>>>>>>>>>>>>>> Bounded case (see the code in my previous email). >> >> >>> >>>>>>>>>>>>>>>>> For the other points: >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the >> >> >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the >> >> >>> >>>>>>>>>>>>>>>>> > Kafka Read implementation allows you to set the >> >> >>> >>>>>>>>>>>>>>>>> > key and value deserializers >> >> >>> >>>>>>>>>>>>>>>>> > which are also used to dictate the output >> >> >>> >>>>>>>>>>>>>>>>> > PCollection type. It also allows you >> >> >>> >>>>>>>>>>>>>>>>> > to set how the watermark should be computed. >> >> >>> >>>>>>>>>>>>>>>>> > Technically a user may want the >> >> >>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read >> >> >>> >>>>>>>>>>>>>>>>> > and they may also want an >> >> >>> >>>>>>>>>>>>>>>>> > output type which is polymorphic (e.g. >> >> >>> >>>>>>>>>>>>>>>>> > Pcollection<Serializable>). >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> Most of the times they do but for parametric types >> >> >>> >>>>>>>>>>>>>>>>> we cannot support different >> >> >>> >>>>>>>>>>>>>>>>> types in the outputs of the Read or at least I did >> >> >>> >>>>>>>>>>>>>>>>> not find how to do so (is >> >> >>> >>>>>>>>>>>>>>>>> there a way to use multiple output Coders on >> >> >>> >>>>>>>>>>>>>>>>> Beam?), we saw this in CassandraIO >> >> >>> >>>>>>>>>>>>>>>>> and we were discussing adding explicitly these >> >> >>> >>>>>>>>>>>>>>>>> Coders or Serializer >> >> >>> >>>>>>>>>>>>>>>>> specific methods to the ReadAll transform. This is >> >> >>> >>>>>>>>>>>>>>>>> less nice because it will >> >> >>> >>>>>>>>>>>>>>>>> imply some repeated methods, but it is still a >> >> >>> >>>>>>>>>>>>>>>>> compromise to gain the other >> >> >>> >>>>>>>>>>>>>>>>> advantages. I suppose the watermark case you >> >> >>> >>>>>>>>>>>>>>>>> mention is similar because you may >> >> >>> >>>>>>>>>>>>>>>>> want the watermark to behave differently in each >> >> >>> >>>>>>>>>>>>>>>>> Read and we probably don’t >> >> >>> >>>>>>>>>>>>>>>>> support this, so it corresponds to the polymorphic >> >> >>> >>>>>>>>>>>>>>>>> category. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own >> >> >>> >>>>>>>>>>>>>>>>> > object modelling concerns. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> > During the implementations of >> >> >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered >> >> >>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or >> >> >>> >>>>>>>>>>>>>>>>> > were ignored if they were set? >> >> >>> >>>>>>>>>>>>>>>>> > If no, then the code deduplication is likely >> >> >>> >>>>>>>>>>>>>>>>> > worth it because we also get a >> >> >>> >>>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this >> >> >>> >>>>>>>>>>>>>>>>> > an acceptable user >> >> >>> >>>>>>>>>>>>>>>>> > experience? >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> No, not so far. This is an interesting part, notice >> >> >>> >>>>>>>>>>>>>>>>> that the Read translation >> >> >>> >>>>>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part >> >> >>> >>>>>>>>>>>>>>>>> of ReadAll so the ReadFn is >> >> >>> >>>>>>>>>>>>>>>>> the real read and must be aware and use all the >> >> >>> >>>>>>>>>>>>>>>>> parameters. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> @Override >> >> >>> >>>>>>>>>>>>>>>>> public PCollection<SolrDocument> expand(PBegin >> >> >>> >>>>>>>>>>>>>>>>> input) { >> >> >>> >>>>>>>>>>>>>>>>> return input.apply("Create", >> >> >>> >>>>>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll()); >> >> >>> >>>>>>>>>>>>>>>>> } >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> I might be missing something for the Unbounded SDF >> >> >>> >>>>>>>>>>>>>>>>> case which is the only case >> >> >>> >>>>>>>>>>>>>>>>> we have not explored so far. I think one easy way >> >> >>> >>>>>>>>>>>>>>>>> to see the limitations would >> >> >>> >>>>>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation >> >> >>> >>>>>>>>>>>>>>>>> to try to map >> >> >>> >>>>>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra >> >> >>> >>>>>>>>>>>>>>>>> PCollection<Read> and the Read logic on >> >> >>> >>>>>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints >> >> >>> >>>>>>>>>>>>>>>>> we hit, the polymorphic ones >> >> >>> >>>>>>>>>>>>>>>>> will be there for sure, maybe others will appear >> >> >>> >>>>>>>>>>>>>>>>> (not sure). However it would be >> >> >>> >>>>>>>>>>>>>>>>> interesting to see if we have a real gain in the >> >> >>> >>>>>>>>>>>>>>>>> maintenance points, but well >> >> >>> >>>>>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of >> >> >>> >>>>>>>>>>>>>>>>> knobs so probably the generic >> >> >>> >>>>>>>>>>>>>>>>> implementation could be relatively complex. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik >> >> >>> >>>>>>>>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 >> >> >>> >>>>>>>>>>>>>>>>> > work for cross language. The difference being >> >> >>> >>>>>>>>>>>>>>>>> > that the cross language transform would take a >> >> >>> >>>>>>>>>>>>>>>>> > well known definition and convert it to the Read >> >> >>> >>>>>>>>>>>>>>>>> > transform. A normal user would have a pipeline >> >> >>> >>>>>>>>>>>>>>>>> > that would look like: >> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output> >> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output> >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look >> >> >>> >>>>>>>>>>>>>>>>> > like: >> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Read> -> PTransform(ReadAll) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output> >> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>* >> >> >>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to >> >> >>> >>>>>>>>>>>>>>>>> > SourceDescriptor) only exists since we haven't >> >> >>> >>>>>>>>>>>>>>>>> > solved how to use schemas with language bound >> >> >>> >>>>>>>>>>>>>>>>> > types in a cross language way. SchemaCoder isn't >> >> >>> >>>>>>>>>>>>>>>>> > portable but RowCoder is which is why the >> >> >>> >>>>>>>>>>>>>>>>> > conversion step exists. We could have a solution >> >> >>> >>>>>>>>>>>>>>>>> > for this at some point in time. >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > My concern with using Read was around: >> >> >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the >> >> >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read >> >> >>> >>>>>>>>>>>>>>>>> > implementation allows you to set the key and >> >> >>> >>>>>>>>>>>>>>>>> > value deserializers which are also used to >> >> >>> >>>>>>>>>>>>>>>>> > dictate the output PCollection type. It also >> >> >>> >>>>>>>>>>>>>>>>> > allows you to set how the watermark should be >> >> >>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the >> >> >>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read >> >> >>> >>>>>>>>>>>>>>>>> > and they may also want an output type which is >> >> >>> >>>>>>>>>>>>>>>>> > polymorphic (e.g. PCollection<Serializable>). >> >> >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own >> >> >>> >>>>>>>>>>>>>>>>> > object modelling concerns. >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > During the implementations of >> >> >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered >> >> >>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or >> >> >>> >>>>>>>>>>>>>>>>> > were ignored if they were set? If no, then the >> >> >>> >>>>>>>>>>>>>>>>> > code deduplication is likely worth it because we >> >> >>> >>>>>>>>>>>>>>>>> > also get a lot of javadoc deduplication, but if >> >> >>> >>>>>>>>>>>>>>>>> > yes is this an acceptable user experience? >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko >> >> >>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying >> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll as a general >> >> >>> >>>>>>>>>>>>>>>>> >> "PTransform<PCollection<Read>, >> >> >>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the >> >> >>> >>>>>>>>>>>>>>>>> >> amount of code duplication and error-prone >> >> >>> >>>>>>>>>>>>>>>>> >> approach related to this. It makes much sense >> >> >>> >>>>>>>>>>>>>>>>> >> since usually we have all needed configuration >> >> >>> >>>>>>>>>>>>>>>>> >> set in Read objects and, as Ismaeil mentioned, >> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll will consist mostly of only >> >> >>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages. So this case usually >> >> >>> >>>>>>>>>>>>>>>>> >> can be unified by using PCollection<Read> as >> >> >>> >>>>>>>>>>>>>>>>> >> input. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use >> >> >>> >>>>>>>>>>>>>>>>> >> Java IOs as cross-language transforms (as Luke >> >> >>> >>>>>>>>>>>>>>>>> >> described) which seems only partly in common >> >> >>> >>>>>>>>>>>>>>>>> >> with previous pattern of ReadAll using. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept >> >> >>> >>>>>>>>>>>>>>>>> >> of read configuration for all needs but seems >> >> >>> >>>>>>>>>>>>>>>>> >> it’s not easy and I’d be more in favour with >> >> >>> >>>>>>>>>>>>>>>>> >> Luke and Boyuan approach with schema. Though, >> >> >>> >>>>>>>>>>>>>>>>> >> maybe ReadAll is not a very suitable name in >> >> >>> >>>>>>>>>>>>>>>>> >> this case because it will can bring some >> >> >>> >>>>>>>>>>>>>>>>> >> confusions related to previous pattern of >> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll uses. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang >> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go >> >> >>> >>>>>>>>>>>>>>>>> >> with (3) and (4): use the data type that is >> >> >>> >>>>>>>>>>>>>>>>> >> schema-aware as the input of ReadAll. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang >> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham! >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the >> >> >>> >>>>>>>>>>>>>>>>> >>> data type that is schema-aware as the input of >> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll. >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick >> >> >>> >>>>>>>>>>>>>>>>> >>> with SDF-like IO. But only having (3) is not >> >> >>> >>>>>>>>>>>>>>>>> >>> enough to solve the problem of using ReadAll in >> >> >>> >>>>>>>>>>>>>>>>> >>> x-lang case. >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type >> >> >>> >>>>>>>>>>>>>>>>> >>> of ReadAll should be able to cross language >> >> >>> >>>>>>>>>>>>>>>>> >>> boundaries and have compatibilities of >> >> >>> >>>>>>>>>>>>>>>>> >>> updating/downgrading. After investigating some >> >> >>> >>>>>>>>>>>>>>>>> >>> possibilities(pure java pojo with custom coder, >> >> >>> >>>>>>>>>>>>>>>>> >>> protobuf, row/schema) in Kafka usage, we find >> >> >>> >>>>>>>>>>>>>>>>> >>> that row/schema fits our needs most. Here comes >> >> >>> >>>>>>>>>>>>>>>>> >>> (4). I believe that using Read as input of >> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also >> >> >>> >>>>>>>>>>>>>>>>> >>> think not all IOs have the same need. I would >> >> >>> >>>>>>>>>>>>>>>>> >>> treat Read as a special type as long as the >> >> >>> >>>>>>>>>>>>>>>>> >>> Read is schema-aware. >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara >> >> >>> >>>>>>>>>>>>>>>>> >>> Jayalath <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three >> >> >>> >>>>>>>>>>>>>>>>> >>>> options discussed so far when it comes to >> >> >>> >>>>>>>>>>>>>>>>> >>>> defining source descriptors for ReadAll type >> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of >> >> >>> >>>>>>>>>>>>>>>>> >>>> the input PCollection >> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as >> >> >>> >>>>>>>>>>>>>>>>> >>>> the data element of the input PCollection >> >> >>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the >> >> >>> >>>>>>>>>>>>>>>>> >>>> Read transform which essentially will convert >> >> >>> >>>>>>>>>>>>>>>>> >>>> it to a ReadAll (what Eugene mentioned) >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related >> >> >>> >>>>>>>>>>>>>>>>> >>>> set of source descriptions such as files. >> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like >> >> >>> >>>>>>>>>>>>>>>>> >>>> will make it hard to use the ReadAll transform >> >> >>> >>>>>>>>>>>>>>>>> >>>> as a cross-language transform and will break >> >> >>> >>>>>>>>>>>>>>>>> >>>> the separation of construction time and >> >> >>> >>>>>>>>>>>>>>>>> >>>> runtime constructs >> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not >> >> >>> >>>>>>>>>>>>>>>>> >>>> careful but will make the transform easier to >> >> >>> >>>>>>>>>>>>>>>>> >>>> be used as a cross-language transform without >> >> >>> >>>>>>>>>>>>>>>>> >>>> additional modifications >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like >> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we >> >> >>> >>>>>>>>>>>>>>>>> >>>> might be able to just define all sources in >> >> >>> >>>>>>>>>>>>>>>>> >>>> that format and make Read transforms just an >> >> >>> >>>>>>>>>>>>>>>>> >>>> easy to use composite built on top of that (by >> >> >>> >>>>>>>>>>>>>>>>> >>>> adding a preceding Create transform). >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> Thanks, >> >> >>> >>>>>>>>>>>>>>>>> >>>> Cham >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik >> >> >>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be >> >> >>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically >> >> >>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform. >> >> >>> >>>>>>>>>>>>>>>>> >>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara >> >> >>> >>>>>>>>>>>>>>>>> >>>>> Jayalath <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> to a transform, at least here: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> time transforms from execution time data >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> objects that we store in PCollections as >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Luke mentioned. Also, we don't guarantee >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> that PTransform is serializable so users >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> have the additional complexity of providing >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> a corder whenever a PTransform is used as a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> data object. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Java objects that are convertible to Beam >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Rows allow us to make these transforms >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> available to other SDKs through the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> cross-language transforms. Using transforms >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> or complex sources as data objects will >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Thanks, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Cham >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Zhang <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to the IO with SDF implementation >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> despite the type of input, where Read >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to UnboundedSource. One major >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> pushback of using KafkaIO.Read as source >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description is that not all configurations >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> during execution time. Also when thinking >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> about x-lang useage, making source >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description across language boundaries is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> also necessary. As Luke mentioned, it's >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> quite easy to infer a Schema from an >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> KafkaSourceDescription.java. Then the coder >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of this schema-aware object will be a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> SchemaCoder. When crossing language >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> into the source description: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Convert.fromRows. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contains the configurable parameters for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> reading from Kafka. This is different from >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the pattern that Ismael listed because >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> they take PCollection<Read> as input and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the Read is the same as the Read >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PTransform class used for the non read all >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> case. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> configure the transform have to be copied >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> over to the source descriptor but >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> decouples how a transform is specified >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> from the object that describes what needs >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> to be done. I believe Ismael's point is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and I believe is a non-issue is that the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Java SDK has the most IO connectors >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and we would want to use the IO >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> implementations within Beam Go and Beam >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Python. This brings in its own set of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> compatibility for the wire format and how >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> one parameterizes such transforms. The >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> wire format issue can be solved with >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> either approach by making sure that the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> cross language expansion always takes the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Read/KafkaSourceDescriptor/... object that >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> is then passed to the ReadAll transform. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Boyuan has been looking to make the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> can be represented as a row and this can >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> be done easily using the AutoValue >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> integration (I don't believe there is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> anything preventing someone from writing a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> schema row -> Read -> row adapter or also >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue configuration if the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue). >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and separation of concerns provided by >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using a different object to represent the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contents of the PCollection from the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> pipeline construction time PTransform. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Kirpichov <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> considered an approach similar (or dual) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> to FileIO.write(), where we in a sense >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> also have to configure a dynamic number >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> different IO transforms of the same type >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> (file writes)? >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> many aspects of many file writes: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic() >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .via(tx -> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> tx.getType().toFields(tx), // Convert >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> the data to be written to CSVSink >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type -> new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames())) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> defaultNaming(type + "-transactions", >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ".csv")); >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> JdbcIO reads: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type from which all the read parameters >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> can be inferred >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> bars.apply(JdbcIO.<Bar, Moo>readAll() >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> this bar...) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo(...)) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> size for this bar...) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ...etc); >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Mejía <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs to DoFn based ones. One pattern >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn. The idea is to have a different >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> like doing multiple queries in the same >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> or querying from multiple tables at the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll transforms the parameters for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> with lots of duplicated with methods and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms into the ReadAll transforms. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> have to expand the input parameters of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> something that resembles the full `Read` >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> want to read from multiple tables or >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but this was not in the intermediate >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> extra methods (duplicating more code) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read full spec. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read method we end up adding them >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transform too so they are taken into >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> account. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> change to test a new approach that is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The code became: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of improvements on parameters of normal >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read parameters. But of course there are >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into normal Reads for example >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> reusable between IOs (e.g. the non-SDF >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case): >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> @Override >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) { >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> return input >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn())) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey()) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn())); >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the results ReadAll you must have the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> require consistent types from the data >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> need to add extra withCoder method(s) on >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> do so. So I wanted to bring this subject >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> opinions, and if you see any sort of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues that >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consensus to start using consistently the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> on Read and the readAll() method for new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this in the only remaining inconsistent >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but apart of this we should be ok). >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> called ReadAll and maybe it is worth to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> be >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik >> >> >>> >>>>>>>>>>>>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 >> >> >>> >>>>>>>>>>>>>>>>> > work for cross language. The difference being >> >> >>> >>>>>>>>>>>>>>>>> > that the cross language transform would take a >> >> >>> >>>>>>>>>>>>>>>>> > well known definition and convert it to the Read >> >> >>> >>>>>>>>>>>>>>>>> > transform. A normal user would have a pipeline >> >> >>> >>>>>>>>>>>>>>>>> > that would look like: >> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output> >> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output> >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > And in the cross language case this would look >> >> >>> >>>>>>>>>>>>>>>>> > like: >> >> >>> >>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Read> -> PTransform(ReadAll) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<Output> >> >> >>> >>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> >> >> >>> >>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> >> >> >>> >>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>* >> >> >>> >>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to >> >> >>> >>>>>>>>>>>>>>>>> > SourceDescriptor) only exists since we haven't >> >> >>> >>>>>>>>>>>>>>>>> > solved how to use schemas with language bound >> >> >>> >>>>>>>>>>>>>>>>> > types in a cross language way. SchemaCoder isn't >> >> >>> >>>>>>>>>>>>>>>>> > portable but RowCoder is which is why the >> >> >>> >>>>>>>>>>>>>>>>> > conversion step exists. We could have a solution >> >> >>> >>>>>>>>>>>>>>>>> > for this at some point in time. >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > My concern with using Read was around: >> >> >>> >>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the >> >> >>> >>>>>>>>>>>>>>>>> > ReadAll? For example, the Kafka Read >> >> >>> >>>>>>>>>>>>>>>>> > implementation allows you to set the key and >> >> >>> >>>>>>>>>>>>>>>>> > value deserializers which are also used to >> >> >>> >>>>>>>>>>>>>>>>> > dictate the output PCollection type. It also >> >> >>> >>>>>>>>>>>>>>>>> > allows you to set how the watermark should be >> >> >>> >>>>>>>>>>>>>>>>> > computed. Technically a user may want the >> >> >>> >>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read >> >> >>> >>>>>>>>>>>>>>>>> > and they may also want an output type which is >> >> >>> >>>>>>>>>>>>>>>>> > polymorphic (e.g. PCollection<Serializable>). >> >> >>> >>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own >> >> >>> >>>>>>>>>>>>>>>>> > object modelling concerns. >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > During the implementations of >> >> >>> >>>>>>>>>>>>>>>>> > ReadAll(PCollection<Read>), was it discovered >> >> >>> >>>>>>>>>>>>>>>>> > that some properties became runtime errors or >> >> >>> >>>>>>>>>>>>>>>>> > were ignored if they were set? If no, then the >> >> >>> >>>>>>>>>>>>>>>>> > code deduplication is likely worth it because we >> >> >>> >>>>>>>>>>>>>>>>> > also get a lot of javadoc deduplication, but if >> >> >>> >>>>>>>>>>>>>>>>> > yes is this an acceptable user experience? >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > >> >> >>> >>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko >> >> >>> >>>>>>>>>>>>>>>>> > <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying >> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll as a general >> >> >>> >>>>>>>>>>>>>>>>> >> "PTransform<PCollection<Read>, >> >> >>> >>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the >> >> >>> >>>>>>>>>>>>>>>>> >> amount of code duplication and error-prone >> >> >>> >>>>>>>>>>>>>>>>> >> approach related to this. It makes much sense >> >> >>> >>>>>>>>>>>>>>>>> >> since usually we have all needed configuration >> >> >>> >>>>>>>>>>>>>>>>> >> set in Read objects and, as Ismaeil mentioned, >> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll will consist mostly of only >> >> >>> >>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages. So this case usually >> >> >>> >>>>>>>>>>>>>>>>> >> can be unified by using PCollection<Read> as >> >> >>> >>>>>>>>>>>>>>>>> >> input. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use >> >> >>> >>>>>>>>>>>>>>>>> >> Java IOs as cross-language transforms (as Luke >> >> >>> >>>>>>>>>>>>>>>>> >> described) which seems only partly in common >> >> >>> >>>>>>>>>>>>>>>>> >> with previous pattern of ReadAll using. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept >> >> >>> >>>>>>>>>>>>>>>>> >> of read configuration for all needs but seems >> >> >>> >>>>>>>>>>>>>>>>> >> it’s not easy and I’d be more in favour with >> >> >>> >>>>>>>>>>>>>>>>> >> Luke and Boyuan approach with schema. Though, >> >> >>> >>>>>>>>>>>>>>>>> >> maybe ReadAll is not a very suitable name in >> >> >>> >>>>>>>>>>>>>>>>> >> this case because it will can bring some >> >> >>> >>>>>>>>>>>>>>>>> >> confusions related to previous pattern of >> >> >>> >>>>>>>>>>>>>>>>> >> ReadAll uses. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang >> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go >> >> >>> >>>>>>>>>>>>>>>>> >> with (3) and (4): use the data type that is >> >> >>> >>>>>>>>>>>>>>>>> >> schema-aware as the input of ReadAll. >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang >> >> >>> >>>>>>>>>>>>>>>>> >> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham! >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the >> >> >>> >>>>>>>>>>>>>>>>> >>> data type that is schema-aware as the input of >> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll. >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick >> >> >>> >>>>>>>>>>>>>>>>> >>> with SDF-like IO. But only having (3) is not >> >> >>> >>>>>>>>>>>>>>>>> >>> enough to solve the problem of using ReadAll in >> >> >>> >>>>>>>>>>>>>>>>> >>> x-lang case. >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type >> >> >>> >>>>>>>>>>>>>>>>> >>> of ReadAll should be able to cross language >> >> >>> >>>>>>>>>>>>>>>>> >>> boundaries and have compatibilities of >> >> >>> >>>>>>>>>>>>>>>>> >>> updating/downgrading. After investigating some >> >> >>> >>>>>>>>>>>>>>>>> >>> possibilities(pure java pojo with custom coder, >> >> >>> >>>>>>>>>>>>>>>>> >>> protobuf, row/schema) in Kafka usage, we find >> >> >>> >>>>>>>>>>>>>>>>> >>> that row/schema fits our needs most. Here comes >> >> >>> >>>>>>>>>>>>>>>>> >>> (4). I believe that using Read as input of >> >> >>> >>>>>>>>>>>>>>>>> >>> ReadAll makes sense in some cases, but I also >> >> >>> >>>>>>>>>>>>>>>>> >>> think not all IOs have the same need. I would >> >> >>> >>>>>>>>>>>>>>>>> >>> treat Read as a special type as long as the >> >> >>> >>>>>>>>>>>>>>>>> >>> Read is schema-aware. >> >> >>> >>>>>>>>>>>>>>>>> >>> >> >> >>> >>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara >> >> >>> >>>>>>>>>>>>>>>>> >>> Jayalath <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three >> >> >>> >>>>>>>>>>>>>>>>> >>>> options discussed so far when it comes to >> >> >>> >>>>>>>>>>>>>>>>> >>>> defining source descriptors for ReadAll type >> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of >> >> >>> >>>>>>>>>>>>>>>>> >>>> the input PCollection >> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as >> >> >>> >>>>>>>>>>>>>>>>> >>>> the data element of the input PCollection >> >> >>> >>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the >> >> >>> >>>>>>>>>>>>>>>>> >>>> Read transform which essentially will convert >> >> >>> >>>>>>>>>>>>>>>>> >>>> it to a ReadAll (what Eugene mentioned) >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related >> >> >>> >>>>>>>>>>>>>>>>> >>>> set of source descriptions such as files. >> >> >>> >>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like >> >> >>> >>>>>>>>>>>>>>>>> >>>> will make it hard to use the ReadAll transform >> >> >>> >>>>>>>>>>>>>>>>> >>>> as a cross-language transform and will break >> >> >>> >>>>>>>>>>>>>>>>> >>>> the separation of construction time and >> >> >>> >>>>>>>>>>>>>>>>> >>>> runtime constructs >> >> >>> >>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not >> >> >>> >>>>>>>>>>>>>>>>> >>>> careful but will make the transform easier to >> >> >>> >>>>>>>>>>>>>>>>> >>>> be used as a cross-language transform without >> >> >>> >>>>>>>>>>>>>>>>> >>>> additional modifications >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like >> >> >>> >>>>>>>>>>>>>>>>> >>>> transforms that are more efficient. So we >> >> >>> >>>>>>>>>>>>>>>>> >>>> might be able to just define all sources in >> >> >>> >>>>>>>>>>>>>>>>> >>>> that format and make Read transforms just an >> >> >>> >>>>>>>>>>>>>>>>> >>>> easy to use composite built on top of that (by >> >> >>> >>>>>>>>>>>>>>>>> >>>> adding a preceding Create transform). >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> Thanks, >> >> >>> >>>>>>>>>>>>>>>>> >>>> Cham >> >> >>> >>>>>>>>>>>>>>>>> >>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik >> >> >>> >>>>>>>>>>>>>>>>> >>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be >> >> >>> >>>>>>>>>>>>>>>>> >>>>> serializable since anonymous DoFns typically >> >> >>> >>>>>>>>>>>>>>>>> >>>>> capture the enclosing PTransform. >> >> >>> >>>>>>>>>>>>>>>>> >>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara >> >> >>> >>>>>>>>>>>>>>>>> >>>>> Jayalath <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> to a transform, at least here: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> time transforms from execution time data >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> objects that we store in PCollections as >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Luke mentioned. Also, we don't guarantee >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> that PTransform is serializable so users >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> have the additional complexity of providing >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> a corder whenever a PTransform is used as a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> data object. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Java objects that are convertible to Beam >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Rows allow us to make these transforms >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> available to other SDKs through the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> cross-language transforms. Using transforms >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> or complex sources as data objects will >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> probably make this difficult. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Thanks, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Cham >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan >> >> >>> >>>>>>>>>>>>>>>>> >>>>>> Zhang <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to the IO with SDF implementation >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> despite the type of input, where Read >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> refers to UnboundedSource. One major >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> pushback of using KafkaIO.Read as source >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description is that not all configurations >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of KafkaIO.Read are meaningful to populate >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> during execution time. Also when thinking >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> about x-lang useage, making source >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> description across language boundaries is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> also necessary. As Luke mentioned, it's >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> quite easy to infer a Schema from an >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> AutoValue object: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> KafkaSourceDescription.java. Then the coder >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> of this schema-aware object will be a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> SchemaCoder. When crossing language >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> boundaries, it's also easy to convert a Row >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> into the source description: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> Convert.fromRows. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>> <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> ReadAll transform takes a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contains the configurable parameters for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> reading from Kafka. This is different from >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the pattern that Ismael listed because >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> they take PCollection<Read> as input and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> the Read is the same as the Read >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> PTransform class used for the non read all >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> case. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> duplication since parameters used to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> configure the transform have to be copied >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> over to the source descriptor but >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> decouples how a transform is specified >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> from the object that describes what needs >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> to be done. I believe Ismael's point is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> that we wouldn't need such a decoupling. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and I believe is a non-issue is that the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Beam Java SDK has the most IO connectors >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and we would want to use the IO >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> implementations within Beam Go and Beam >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Python. This brings in its own set of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> issues related to versioning and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> compatibility for the wire format and how >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> one parameterizes such transforms. The >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> wire format issue can be solved with >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> either approach by making sure that the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> cross language expansion always takes the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> well known format (whatever it may be) and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> converts it into >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Read/KafkaSourceDescriptor/... object that >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> is then passed to the ReadAll transform. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Boyuan has been looking to make the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> can be represented as a row and this can >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> be done easily using the AutoValue >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> integration (I don't believe there is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> anything preventing someone from writing a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> schema row -> Read -> row adapter or also >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue configuration if the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> transform is also an AutoValue). >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> and separation of concerns provided by >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> using a different object to represent the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> contents of the PCollection from the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> pipeline construction time PTransform. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>> Kirpichov <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> considered an approach similar (or dual) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> to FileIO.write(), where we in a sense >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> also have to configure a dynamic number >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> different IO transforms of the same type >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> (file writes)? >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> many aspects of many file writes: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic() >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .via(tx -> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> tx.getType().toFields(tx), // Convert >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> the data to be written to CSVSink >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type -> new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> CSVSink(type.getFieldNames())) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> defaultNaming(type + "-transactions", >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ".csv")); >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> JdbcIO reads: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> type from which all the read parameters >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> can be inferred >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> bars.apply(JdbcIO.<Bar, Moo>readAll() >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> this bar...) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Moo(...)) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> size for this bar...) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> ...etc); >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>> Mejía <[email protected]> wrote: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Hello, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> requires context) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs to DoFn based ones. One pattern >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> DoFn. The idea is to have a different >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> queries, etc, for example: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<String>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> like doing multiple queries in the same >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> or querying from multiple tables at the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll transforms the parameters for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> with lots of duplicated with methods and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transforms into the ReadAll transforms. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> have to expand the input parameters of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> something that resembles the full `Read` >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> want to read from multiple tables or >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but this was not in the intermediate >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> extra methods (duplicating more code) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read full spec. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read method we end up adding them >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> transform too so they are taken into >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> account. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> change to test a new approach that is >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> The code became: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> of improvements on parameters of normal >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Read parameters. But of course there are >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats: >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> into normal Reads for example >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Restriction information (in the SDF >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> approach of ReadAll produces a simple >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> reusable between IOs (e.g. the non-SDF >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> case): >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> @Override >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) { >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> return input >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> SplitFn())) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey()) >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadFn())); >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> } >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> the results ReadAll you must have the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> require consistent types from the data >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> need to add extra withCoder method(s) on >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> follow this ReadAll pattern. RedisIO >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> do so. So I wanted to bring this subject >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> opinions, and if you see any sort of >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> issues that >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consensus to start using consistently the >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> on Read and the readAll() method for new >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> this in the only remaining inconsistent >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> but apart of this we should be ok). >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> KafkaIO based on SDF is doing something >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> called ReadAll and maybe it is worth to >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> be >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Regards, >> >> >>> >>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>> >> >> >> >>> >>>>>>>>>>>>>>>>>
