On Thu, Jul 2, 2020 at 10:26 AM Kenneth Knowles <[email protected]> wrote:
> > On Wed, Jul 1, 2020 at 4:17 PM Eugene Kirpichov <[email protected]> > wrote: > >> Kenn - I don't mean an enum of common closures, I mean expressing >> closures in a restricted sub-language such as the language of SQL >> expressions. >> > > My lack of clarity: enums was my phrasing of Luke's item 1). I understood > what you meant. I think either a set of well-known closures or a tiny > sublanguage could add value. > > >> That would only work if there is a portable way to interpret SQL >> expressions, but if there isn't, maybe there should be - for the sake of, >> well, expressing closures portably. Of course these would be closures that >> only work with rows - but that seems powerful enough for many if not most >> purposes. >> > > You can choose a SQL dialect or choose the tiniest subset just for this > purpose and go with it. But when the data type going in or out of the > lambda are e.g. some Java or Python object then what? One idea is to always > require these to be rows. But if you can really get away with a > dependency-free context-free lambda, then Javascript or Python is as doable > as SQL in terms of having a tiny restricted language for just this purpose. > I would expect once it got used, folks would start to ask to include the > rest of what the language has to offer - its ecosystem. This is always the > main design point I am interested in for "lightweight" embedded UDF > proposals. > This is getting off the topic of ReadAll, but I think being able to do arbitrary computation in preceding/succeeding transform plus a (quite) restricted language in the transform itself can go a long way. (For example, one could have a dynamic destinations write that takes a KV<element, dest> where dest is a format string like "foo-{{shard_num}}.txt" to plug in the truly dynamic pieces, but the dest string itself can be computed (e.g. based on the element) using arbitrary code in the caller language.) > > Kenn > > >> For example, maybe the Java example: >> >> PCollection<BankTransaction> transactions = ...; >> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic() >> .by(Transaction::getType) >> .via(tx -> tx.getType().toFields(tx), // Convert the data to be >> written to CSVSink >> type -> new CSVSink(type.getFieldNames())) >> .to(".../path/to/") >> .withNaming(type -> defaultNaming(type + "-transactions", ".csv")); >> >> could be written in Python as: >> >> transactions | fileio.write_dynamic( >> by="it.type", # "it" is implicitly available in these SQL expressions >> as the same thing as the Java lambda argument >> format="it.fields", >> sink="CSV_SINK(it.type.field_names)", # A bunch of preset sinks >> supported in every language? >> to=".../path/to/", >> naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')") >> >> Again, to be clear, I'm not suggesting to block what Ismael is proposing >> on getting this done - getting this done wouldn't be a short term effort, >> but seems potentially really nice. >> >> >> On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <[email protected]> wrote: >> >>> From the Go side of the table, the Go language doesn't provide a >>> mechanism to serialize or access closure data, which means DoFns can't be >>> functional closures.This combined with the move to have the "Structural >>> DoFns" be serialized using Beam Schemas, has the net result that if Go >>> transforms are used for Cross Language, they will be configurable with a >>> Schema of the configuration data. >>> >>> Of course, this just means that each language will probably provide >>> whichever mechanisms it likes for use of it's cross language transforms. >>> >>> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <[email protected]> wrote: >>> >>>> I don't think an enum of most common closures will work. The input >>>> types are typically generics that are made concrete by the caller who also >>>> provides the closures. I think Luke's (2) is the same idea as my "Java >>>> still assembles it [using opaque Python closures/transforms]". It seems >>>> like an approach to (3). Passing over actual code could address some cases, >>>> but libraries become the issue. >>>> >>>> I think it is fair to say that "WriteAll" style would involve entering >>>> unexplored territory. >>>> >>>> On the main topic, I think Brian has a pretty strong point and his >>>> example of type conversion lambdas is a good example. I did a quick survey >>>> and every other property I could find does seem like it fits on the Read, >>>> and most IOs have a few of these closures for example also extracting >>>> timestamps. So maybe just a resolution convention of putting them on the >>>> ReadAll and that taking precedence. Then you would be deserializing a Read >>>> transform with insta-crash methods or some such? >>>> >>>> Kenn >>>> >>>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov <[email protected]> >>>> wrote: >>>> >>>>> Yeah, mainly I just feel like dynamic reads and dynamic writes (and >>>>> perhaps not-yet-invented similar transforms of other kinds) are tightly >>>>> related - they are either very similar, or are duals of each other - so >>>>> they should use the same approach. If they are using different approaches, >>>>> it is a sign that either one of them is being done wrong or that we are >>>>> running into a fundamental limitation of Beam (e.g. difficulty of encoding >>>>> closures compared to encoding elements). >>>>> >>>>> But I agree with Luke that we shouldn't give up on closures. >>>>> Especially with the work that has been done on schemas and SQL, I see no >>>>> reason why we couldn't express closures in a portable restricted >>>>> sub-language. If we can express SQL, we can express many or most use cases >>>>> of dynamic reads/writes - I don't mean that we should actually use SQL >>>>> (though we *could* - e.g. SQL scalar expressions seem powerful enough >>>>> to express the closures appearing in most use cases of >>>>> FileIO.writeDynamic), I just mean that SQL is an existence proof. >>>>> >>>>> (I don't want to rock the boat too much, just thought I'd chime in as >>>>> this topic is dear to my heart) >>>>> >>>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> wrote: >>>>> >>>>>> Kenn, I'm not too worried about closures since: >>>>>> 1) the expansion service for a transform could have a well set of >>>>>> defined closures by name that are returned as serialized objects that >>>>>> don't >>>>>> need to be interpretable by the caller >>>>>> 2) the language could store serialized functions of another language >>>>>> as constants >>>>>> 3) generic XLang function support will eventually be needed >>>>>> but I do agree that closures do make things difficult to express vs >>>>>> data which is why primarily why we should prefer data over closures when >>>>>> possible and use closures when expressing it with data would be too >>>>>> cumbersome. >>>>>> >>>>>> Brian, so far the cases that have been migrated have shown that the >>>>>> source descriptor and the Read transform are almost the same (some >>>>>> parameters that only impact pipeline construction such as coders differ). >>>>>> >>>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Sorry for jumping into this late and casting a vote against the >>>>>>> consensus... but I think I'd prefer standardizing on a pattern like >>>>>>> PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. That >>>>>>> approach clearly separates the parameters that are allowed to vary >>>>>>> across a >>>>>>> ReadAll (the ones defined in KafkaSourceDescriptor) from the parameters >>>>>>> that should be constant (other parameters in the Read object, like >>>>>>> SerializedFunctions for type conversions, parameters for different >>>>>>> operating modes, etc...). I think it's helpful to think of the >>>>>>> parameters >>>>>>> that are allowed to vary as some "location descriptor", but I imagine IO >>>>>>> authors may want other parameters to vary across a ReadAll as well. >>>>>>> >>>>>>> To me it seems safer to let an IO author "opt-in" to a parameter >>>>>>> being dynamic at execution time. >>>>>>> >>>>>>> Brian >>>>>>> >>>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I'd like to raise one more time the question of consistency between >>>>>>>> dynamic reads and dynamic writes, per my email at the beginning of the >>>>>>>> thread. >>>>>>>> If the community prefers ReadAll to read from Read, then should >>>>>>>> dynamicWrite's write to Write? >>>>>>>> >>>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> It seems like most of us agree on the idea that ReadAll should >>>>>>>>> read from Read. I'm going to update the Kafka ReadAll with the same >>>>>>>>> pattern. >>>>>>>>> Thanks for all your help! >>>>>>>>> >>>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I would also like to suggest that transforms that implement >>>>>>>>>>> ReadAll via Read should also provide methods like: >>>>>>>>>>> >>>>>>>>>>> // Uses the specified values if unspecified in the input element >>>>>>>>>>> from the PCollection<Read>. >>>>>>>>>>> withDefaults(Read read); >>>>>>>>>>> // Uses the specified values regardless of what the input >>>>>>>>>>> element from the PCollection<Read> specifies. >>>>>>>>>>> withOverrides(Read read); >>>>>>>>>>> >>>>>>>>>>> and only adds methods that are required at construction time >>>>>>>>>>> (e.g. coders). This way the majority of documentation sits on the >>>>>>>>>>> Read >>>>>>>>>>> transform. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here and >>>>>>>>>> some of the drawbacks related to cross-language can be overcome >>>>>>>>>> through >>>>>>>>>> future advancements. >>>>>>>>>> Thanks for bringing this up Ismaël. >>>>>>>>>> >>>>>>>>>> - Cham >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ismael, it is good to hear that using Read as the input didn't >>>>>>>>>>>> have a bunch of parameters that were being skipped/ignored. Also, >>>>>>>>>>>> for the >>>>>>>>>>>> polymorphism issue you have to rely on the user correctly telling >>>>>>>>>>>> you the >>>>>>>>>>>> type in such a way where it is a common ancestor of all the >>>>>>>>>>>> runtime types >>>>>>>>>>>> that will ever be used. This usually boils down to something like >>>>>>>>>>>> Serializable or DynamicMessage such that the coder that is chosen >>>>>>>>>>>> works for >>>>>>>>>>>> all the runtime types. Using multiple types is a valid use case >>>>>>>>>>>> and would >>>>>>>>>>>> allow for a simpler graph with less flattens merging the output >>>>>>>>>>>> from >>>>>>>>>>>> multiple sources. >>>>>>>>>>>> >>>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for >>>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the parameters >>>>>>>>>>>> can't be >>>>>>>>>>>> represented in a meaningful way beyond "bytes". This would be >>>>>>>>>>>> helpful for >>>>>>>>>>>> cross language as well since every parameter would become >>>>>>>>>>>> available if a >>>>>>>>>>>> language could support it (e.g. it could serialize a java function >>>>>>>>>>>> up front >>>>>>>>>>>> and keep it saved as raw bytes within said language). Even if we >>>>>>>>>>>> figure out >>>>>>>>>>>> a better way to do this in the future, we'll have to change the >>>>>>>>>>>> schema for >>>>>>>>>>>> the new way anyway. This would mean that the external version of >>>>>>>>>>>> the >>>>>>>>>>>> transform adopts Row to Read and we drop KafkaSourceDescriptor. The >>>>>>>>>>>> conversion from Row to Read could validate that the parameters >>>>>>>>>>>> make sense >>>>>>>>>>>> (e.g. the bytes are valid serialized functions). The addition of an >>>>>>>>>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as >>>>>>>>>>>> well and >>>>>>>>>>>> this would enable having a bounded version that could be used for >>>>>>>>>>>> backfills >>>>>>>>>>>> (this doesn't have to be done as part of any current ongoing PR). >>>>>>>>>>>> Essentially any parameter that could be added for a single >>>>>>>>>>>> instance of a >>>>>>>>>>>> Kafka element+restriction would also make sense to the KafkaIO.Read >>>>>>>>>>>> transform since it too is a single instance. There are parameters >>>>>>>>>>>> that >>>>>>>>>>>> would apply to the ReadAll that wouldn't apply to a read and these >>>>>>>>>>>> would be >>>>>>>>>>>> global parameters across all element+restriction pairs such as >>>>>>>>>>>> config >>>>>>>>>>>> overrides or default values. >>>>>>>>>>>> >>>>>>>>>>>> I am convinced that we should do as Ismael is suggesting and >>>>>>>>>>>> use KafkaIO.Read as the type. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Discussion regarding cross-language transforms is a slight >>>>>>>>>>>>> tangent here. But I think, in general, it's great if we can use >>>>>>>>>>>>> existing >>>>>>>>>>>>> transforms (for example, IO connectors) as cross-language >>>>>>>>>>>>> transforms >>>>>>>>>>>>> without having to build more composites (irrespective of whether >>>>>>>>>>>>> in >>>>>>>>>>>>> ExternalTransformBuilders or a user pipelines) just to make them >>>>>>>>>>>>> cross-language compatible. A future cross-language compatible >>>>>>>>>>>>> SchemaCoder >>>>>>>>>>>>> might help (assuming that works for Read transform) but I'm not >>>>>>>>>>>>> sure we >>>>>>>>>>>>> have a good idea when we'll get to that state. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Cham >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the >>>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline update >>>>>>>>>>>>>> scenario(For >>>>>>>>>>>>>> detailed discussion, please refer to >>>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). >>>>>>>>>>>>>> In order to obtain the compatibility, it requires the input of >>>>>>>>>>>>>> the read SDF >>>>>>>>>>>>>> is schema-aware. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to >>>>>>>>>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be >>>>>>>>>>>>>> schema-aware, >>>>>>>>>>>>>> otherwise pipeline updates might fail unnecessarily. If looking >>>>>>>>>>>>>> into >>>>>>>>>>>>>> KafkaIO.Read, not all necessary fields are compatible with >>>>>>>>>>>>>> schema, for >>>>>>>>>>>>>> example, SerializedFunction. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a >>>>>>>>>>>>>> common pattern for SDF based IO. The Read can be a common >>>>>>>>>>>>>> pattern because >>>>>>>>>>>>>> the input is always a PBegin. But for an SDF based IO, the input >>>>>>>>>>>>>> can be >>>>>>>>>>>>>> anything. By using Read as input, we will still have the >>>>>>>>>>>>>> maintenance cost >>>>>>>>>>>>>> when SDF IO supports a new field but Read doesn't consume it. >>>>>>>>>>>>>> For example, >>>>>>>>>>>>>> we are discussing adding endOffset and endReadTime to >>>>>>>>>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> We forgot to mention (5) External.Config used in cross-lang, >>>>>>>>>>>>>>> see KafkaIO >>>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the predecessor >>>>>>>>>>>>>>> of (4) and probably a >>>>>>>>>>>>>>> really good candidate to be replaced by the Row based >>>>>>>>>>>>>>> Configuration Boyuan is >>>>>>>>>>>>>>> envisioning (so good to be aware of this). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention the real >>>>>>>>>>>>>>> issue(s). All the >>>>>>>>>>>>>>> approaches discussed so far in the end could be easily >>>>>>>>>>>>>>> transformed to produce a >>>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be read by >>>>>>>>>>>>>>> the generic ReadAll >>>>>>>>>>>>>>> transform. Notice that this can be internal in some IOs e.g. >>>>>>>>>>>>>>> KafkaIO if they >>>>>>>>>>>>>>> decide not to expose it. I am not saying that we should >>>>>>>>>>>>>>> force every IO to >>>>>>>>>>>>>>> support ReadAll in its public API but if we do it is >>>>>>>>>>>>>>> probably a good idea to be >>>>>>>>>>>>>>> consistent with naming the transform that expects an input >>>>>>>>>>>>>>> PCollection<Read> in >>>>>>>>>>>>>>> the same way. Also notice that using it will save us of the >>>>>>>>>>>>>>> maintenance issues >>>>>>>>>>>>>>> discussed in my previous email. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Back to the main concern: the consequences of expansion >>>>>>>>>>>>>>> based on Read: So far I >>>>>>>>>>>>>>> have not seen consequences for the Splitting part which maps >>>>>>>>>>>>>>> really nice >>>>>>>>>>>>>>> assuming the Partition info / Restriction is available as >>>>>>>>>>>>>>> part of Read. So far >>>>>>>>>>>>>>> there are not Serialization because Beam is already >>>>>>>>>>>>>>> enforcing this. Notice that >>>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF >>>>>>>>>>>>>>> at least for the >>>>>>>>>>>>>>> Bounded case (see the code in my previous email). For the >>>>>>>>>>>>>>> other points: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? >>>>>>>>>>>>>>> For example, the >>>>>>>>>>>>>>> > Kafka Read implementation allows you to set the key and >>>>>>>>>>>>>>> value deserializers >>>>>>>>>>>>>>> > which are also used to dictate the output PCollection >>>>>>>>>>>>>>> type. It also allows you >>>>>>>>>>>>>>> > to set how the watermark should be computed. Technically a >>>>>>>>>>>>>>> user may want the >>>>>>>>>>>>>>> > watermark computation to be configurable per Read and they >>>>>>>>>>>>>>> may also want an >>>>>>>>>>>>>>> > output type which is polymorphic (e.g. >>>>>>>>>>>>>>> Pcollection<Serializable>). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Most of the times they do but for parametric types we cannot >>>>>>>>>>>>>>> support different >>>>>>>>>>>>>>> types in the outputs of the Read or at least I did not find >>>>>>>>>>>>>>> how to do so (is >>>>>>>>>>>>>>> there a way to use multiple output Coders on Beam?), we saw >>>>>>>>>>>>>>> this in CassandraIO >>>>>>>>>>>>>>> and we were discussing adding explicitly these Coders or >>>>>>>>>>>>>>> Serializer >>>>>>>>>>>>>>> specific methods to the ReadAll transform. This is less nice >>>>>>>>>>>>>>> because it will >>>>>>>>>>>>>>> imply some repeated methods, but it is still a compromise to >>>>>>>>>>>>>>> gain the other >>>>>>>>>>>>>>> advantages. I suppose the watermark case you mention is >>>>>>>>>>>>>>> similar because you may >>>>>>>>>>>>>>> want the watermark to behave differently in each Read and we >>>>>>>>>>>>>>> probably don’t >>>>>>>>>>>>>>> support this, so it corresponds to the polymorphic category. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object >>>>>>>>>>>>>>> modelling concerns. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), >>>>>>>>>>>>>>> was it discovered >>>>>>>>>>>>>>> > that some properties became runtime errors or were ignored >>>>>>>>>>>>>>> if they were set? >>>>>>>>>>>>>>> > If no, then the code deduplication is likely worth it >>>>>>>>>>>>>>> because we also get a >>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an >>>>>>>>>>>>>>> acceptable user >>>>>>>>>>>>>>> > experience? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> No, not so far. This is an interesting part, notice that the >>>>>>>>>>>>>>> Read translation >>>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part of >>>>>>>>>>>>>>> ReadAll so the ReadFn is >>>>>>>>>>>>>>> the real read and must be aware and use all the parameters. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public PCollection<SolrDocument> expand(PBegin input) { >>>>>>>>>>>>>>> return input.apply("Create", >>>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll()); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I might be missing something for the Unbounded SDF case >>>>>>>>>>>>>>> which is the only case >>>>>>>>>>>>>>> we have not explored so far. I think one easy way to see the >>>>>>>>>>>>>>> limitations would >>>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to >>>>>>>>>>>>>>> map >>>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and >>>>>>>>>>>>>>> the Read logic on >>>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit, >>>>>>>>>>>>>>> the polymorphic ones >>>>>>>>>>>>>>> will be there for sure, maybe others will appear (not sure). >>>>>>>>>>>>>>> However it would be >>>>>>>>>>>>>>> interesting to see if we have a real gain in the maintenance >>>>>>>>>>>>>>> points, but well >>>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so >>>>>>>>>>>>>>> probably the generic >>>>>>>>>>>>>>> implementation could be relatively complex. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for >>>>>>>>>>>>>>> cross language. The difference being that the cross language >>>>>>>>>>>>>>> transform >>>>>>>>>>>>>>> would take a well known definition and convert it to the Read >>>>>>>>>>>>>>> transform. A >>>>>>>>>>>>>>> normal user would have a pipeline that would look like: >>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>>>>>>>>> PCollection<Output> >>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>>>>>>>>> PCollection<Output> >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > And in the cross language case this would look like: >>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> >>>>>>>>>>>>>>> PTransform(Convert Row to Read) -> PCollection<Read> -> >>>>>>>>>>>>>>> PTransform(ReadAll) >>>>>>>>>>>>>>> -> PCollection<Output> >>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> >>>>>>>>>>>>>>> PTransform(Convert Row to SourceDescriptor) -> >>>>>>>>>>>>>>> PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>>>>>>>>> PCollection<Output>* >>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) >>>>>>>>>>>>>>> only exists since we haven't solved how to use schemas with >>>>>>>>>>>>>>> language bound >>>>>>>>>>>>>>> types in a cross language way. SchemaCoder isn't portable but >>>>>>>>>>>>>>> RowCoder is >>>>>>>>>>>>>>> which is why the conversion step exists. We could have a >>>>>>>>>>>>>>> solution for this >>>>>>>>>>>>>>> at some point in time. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > My concern with using Read was around: >>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? >>>>>>>>>>>>>>> For example, the Kafka Read implementation allows you to set >>>>>>>>>>>>>>> the key and >>>>>>>>>>>>>>> value deserializers which are also used to dictate the output >>>>>>>>>>>>>>> PCollection >>>>>>>>>>>>>>> type. It also allows you to set how the watermark should be >>>>>>>>>>>>>>> computed. >>>>>>>>>>>>>>> Technically a user may want the watermark computation to be >>>>>>>>>>>>>>> configurable >>>>>>>>>>>>>>> per Read and they may also want an output type which is >>>>>>>>>>>>>>> polymorphic (e.g. >>>>>>>>>>>>>>> PCollection<Serializable>). >>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object >>>>>>>>>>>>>>> modelling concerns. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), >>>>>>>>>>>>>>> was it discovered that some properties became runtime errors or >>>>>>>>>>>>>>> were >>>>>>>>>>>>>>> ignored if they were set? If no, then the code deduplication is >>>>>>>>>>>>>>> likely >>>>>>>>>>>>>>> worth it because we also get a lot of javadoc deduplication, >>>>>>>>>>>>>>> but if yes is >>>>>>>>>>>>>>> this an acceptable user experience? >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a >>>>>>>>>>>>>>> general "PTransform<PCollection<Read>, >>>>>>>>>>>>>>> PCollection<OutputType>>” was to >>>>>>>>>>>>>>> reduce the amount of code duplication and error-prone approach >>>>>>>>>>>>>>> related to >>>>>>>>>>>>>>> this. It makes much sense since usually we have all needed >>>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will >>>>>>>>>>>>>>> consist mostly >>>>>>>>>>>>>>> of only Split-Shuffle-Read stages. So this case usually can be >>>>>>>>>>>>>>> unified by >>>>>>>>>>>>>>> using PCollection<Read> as input. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs >>>>>>>>>>>>>>> as cross-language transforms (as Luke described) which seems >>>>>>>>>>>>>>> only partly in >>>>>>>>>>>>>>> common with previous pattern of ReadAll using. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read >>>>>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be >>>>>>>>>>>>>>> more in >>>>>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe >>>>>>>>>>>>>>> ReadAll is >>>>>>>>>>>>>>> not a very suitable name in this case because it will can bring >>>>>>>>>>>>>>> some >>>>>>>>>>>>>>> confusions related to previous pattern of ReadAll uses. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and >>>>>>>>>>>>>>> (4): use the data type that is schema-aware as the input of >>>>>>>>>>>>>>> ReadAll. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> Thanks for the summary, Cham! >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type >>>>>>>>>>>>>>> that is schema-aware as the input of ReadAll. >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with >>>>>>>>>>>>>>> SDF-like IO. But only having (3) is not enough to solve the >>>>>>>>>>>>>>> problem of >>>>>>>>>>>>>>> using ReadAll in x-lang case. >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of >>>>>>>>>>>>>>> ReadAll should be able to cross language boundaries and have >>>>>>>>>>>>>>> compatibilities of updating/downgrading. After investigating >>>>>>>>>>>>>>> some >>>>>>>>>>>>>>> possibilities(pure java pojo with custom coder, protobuf, >>>>>>>>>>>>>>> row/schema) in >>>>>>>>>>>>>>> Kafka usage, we find that row/schema fits our needs most. Here >>>>>>>>>>>>>>> comes (4). I >>>>>>>>>>>>>>> believe that using Read as input of ReadAll makes sense in some >>>>>>>>>>>>>>> cases, but >>>>>>>>>>>>>>> I also think not all IOs have the same need. I would treat Read >>>>>>>>>>>>>>> as a >>>>>>>>>>>>>>> special type as long as the Read is schema-aware. >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options >>>>>>>>>>>>>>> discussed so far when it comes to defining source descriptors >>>>>>>>>>>>>>> for ReadAll >>>>>>>>>>>>>>> type transforms >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the >>>>>>>>>>>>>>> input PCollection >>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data >>>>>>>>>>>>>>> element of the input PCollection >>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read >>>>>>>>>>>>>>> transform which essentially will convert it to a ReadAll (what >>>>>>>>>>>>>>> Eugene >>>>>>>>>>>>>>> mentioned) >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of >>>>>>>>>>>>>>> source descriptions such as files. >>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make >>>>>>>>>>>>>>> it hard to use the ReadAll transform as a cross-language >>>>>>>>>>>>>>> transform and will >>>>>>>>>>>>>>> break the separation of construction time and runtime constructs >>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but >>>>>>>>>>>>>>> will make the transform easier to be used as a cross-language >>>>>>>>>>>>>>> transform >>>>>>>>>>>>>>> without additional modifications >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms >>>>>>>>>>>>>>> that are more efficient. So we might be able to just define all >>>>>>>>>>>>>>> sources in >>>>>>>>>>>>>>> that format and make Read transforms just an easy to use >>>>>>>>>>>>>>> composite built on >>>>>>>>>>>>>>> top of that (by adding a preceding Create transform). >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> Thanks, >>>>>>>>>>>>>>> >>>> Cham >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable >>>>>>>>>>>>>>> since anonymous DoFns typically capture the enclosing >>>>>>>>>>>>>>> PTransform. >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a >>>>>>>>>>>>>>> transform, at least here: >>>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time >>>>>>>>>>>>>>> transforms from execution time data objects that we store in >>>>>>>>>>>>>>> PCollections >>>>>>>>>>>>>>> as Luke mentioned. Also, we don't guarantee that PTransform is >>>>>>>>>>>>>>> serializable >>>>>>>>>>>>>>> so users have the additional complexity of providing a corder >>>>>>>>>>>>>>> whenever a >>>>>>>>>>>>>>> PTransform is used as a data object. >>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java >>>>>>>>>>>>>>> objects that are convertible to Beam Rows allow us to make >>>>>>>>>>>>>>> these transforms >>>>>>>>>>>>>>> available to other SDKs through the cross-language transforms. >>>>>>>>>>>>>>> Using >>>>>>>>>>>>>>> transforms or complex sources as data objects will probably >>>>>>>>>>>>>>> make this >>>>>>>>>>>>>>> difficult. >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>> Cham >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> Hi Ismael, >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to >>>>>>>>>>>>>>> the IO with SDF implementation despite the type of input, where >>>>>>>>>>>>>>> Read refers >>>>>>>>>>>>>>> to UnboundedSource. One major pushback of using KafkaIO.Read >>>>>>>>>>>>>>> as source >>>>>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are >>>>>>>>>>>>>>> meaningful >>>>>>>>>>>>>>> to populate during execution time. Also when thinking about >>>>>>>>>>>>>>> x-lang useage, >>>>>>>>>>>>>>> making source description across language boundaries is also >>>>>>>>>>>>>>> necessary. As >>>>>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an >>>>>>>>>>>>>>> AutoValue object: >>>>>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this >>>>>>>>>>>>>>> schema-aware object >>>>>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's >>>>>>>>>>>>>>> also easy to >>>>>>>>>>>>>>> convert a Row into the source description: Convert.fromRows. >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll >>>>>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This >>>>>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable >>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>> for reading from Kafka. This is different from the pattern that >>>>>>>>>>>>>>> Ismael >>>>>>>>>>>>>>> listed because they take PCollection<Read> as input and the >>>>>>>>>>>>>>> Read is the >>>>>>>>>>>>>>> same as the Read PTransform class used for the non read all >>>>>>>>>>>>>>> case. >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication >>>>>>>>>>>>>>> since parameters used to configure the transform have to be >>>>>>>>>>>>>>> copied over to >>>>>>>>>>>>>>> the source descriptor but decouples how a transform is >>>>>>>>>>>>>>> specified from the >>>>>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's >>>>>>>>>>>>>>> point is >>>>>>>>>>>>>>> that we wouldn't need such a decoupling. >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I >>>>>>>>>>>>>>> believe is a non-issue is that the Beam Java SDK has the most >>>>>>>>>>>>>>> IO connectors >>>>>>>>>>>>>>> and we would want to use the IO implementations within Beam Go >>>>>>>>>>>>>>> and Beam >>>>>>>>>>>>>>> Python. This brings in its own set of issues related to >>>>>>>>>>>>>>> versioning and >>>>>>>>>>>>>>> compatibility for the wire format and how one parameterizes such >>>>>>>>>>>>>>> transforms. The wire format issue can be solved with either >>>>>>>>>>>>>>> approach by >>>>>>>>>>>>>>> making sure that the cross language expansion always takes the >>>>>>>>>>>>>>> well known >>>>>>>>>>>>>>> format (whatever it may be) and converts it into >>>>>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to >>>>>>>>>>>>>>> the ReadAll >>>>>>>>>>>>>>> transform. Boyuan has been looking to make the >>>>>>>>>>>>>>> KafkaSourceDescriptor have a >>>>>>>>>>>>>>> schema so it can be represented as a row and this can be done >>>>>>>>>>>>>>> easily using >>>>>>>>>>>>>>> the AutoValue integration (I don't believe there is anything >>>>>>>>>>>>>>> preventing >>>>>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or >>>>>>>>>>>>>>> also using the >>>>>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue). >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and >>>>>>>>>>>>>>> separation of concerns provided by using a different object to >>>>>>>>>>>>>>> represent >>>>>>>>>>>>>>> the contents of the PCollection from the pipeline construction >>>>>>>>>>>>>>> time >>>>>>>>>>>>>>> PTransform. >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael, >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an >>>>>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a >>>>>>>>>>>>>>> sense also have >>>>>>>>>>>>>>> to configure a dynamic number different IO transforms of the >>>>>>>>>>>>>>> same type >>>>>>>>>>>>>>> (file writes)? >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects >>>>>>>>>>>>>>> of many file writes: >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>>>>>>>>>>>> Transaction>writeDynamic() >>>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >>>>>>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // >>>>>>>>>>>>>>> Convert the data to be written to CSVSink >>>>>>>>>>>>>>> >>>>>>>>> type -> new >>>>>>>>>>>>>>> CSVSink(type.getFieldNames())) >>>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >>>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>>>>>>>>>>>> "-transactions", ".csv")); >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO >>>>>>>>>>>>>>> reads: >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from >>>>>>>>>>>>>>> which all the read parameters can be inferred >>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>>>>>>>>>>>>>> Moo>readAll() >>>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this >>>>>>>>>>>>>>> bar...) >>>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for >>>>>>>>>>>>>>> this bar...) >>>>>>>>>>>>>>> >>>>>>>>> ...etc); >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires >>>>>>>>>>>>>>> context) >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn >>>>>>>>>>>>>>> based ones. One pattern >>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The >>>>>>>>>>>>>>> idea is to have a different >>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a >>>>>>>>>>>>>>> PCollection of different sorts of >>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, >>>>>>>>>>>>>>> etc, for example: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >>>>>>>>>>>>>>> PCollection<OutputT>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> RedisIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>>>>>>>>>> PCollection<KV<String, String>>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >>>>>>>>>>>>>>> PTransform<PCollection<HBaseQuery>, PCollection<Result>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like >>>>>>>>>>>>>>> doing multiple queries in the same >>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or >>>>>>>>>>>>>>> querying from multiple tables at the >>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll >>>>>>>>>>>>>>> transforms the parameters for >>>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>>>>>>>>>>>> duplicated with methods and >>>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into >>>>>>>>>>>>>>> the ReadAll transforms. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to >>>>>>>>>>>>>>> expand the input parameters of the >>>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into something that >>>>>>>>>>>>>>> resembles the full `Read` >>>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to read >>>>>>>>>>>>>>> from multiple tables or >>>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this >>>>>>>>>>>>>>> was not in the intermediate >>>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra >>>>>>>>>>>>>>> methods (duplicating more code) >>>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read full >>>>>>>>>>>>>>> spec. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method >>>>>>>>>>>>>>> we end up adding them >>>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so >>>>>>>>>>>>>>> they are taken into account. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to >>>>>>>>>>>>>>> test a new approach that is >>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code >>>>>>>>>>>>>>> became: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>>>>>>>>>>>> PCollection<Result>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of >>>>>>>>>>>>>>> improvements on parameters of normal >>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read >>>>>>>>>>>>>>> parameters. But of course there are >>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal >>>>>>>>>>>>>>> Reads for example >>>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or >>>>>>>>>>>>>>> Restriction information (in the SDF >>>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent approach >>>>>>>>>>>>>>> of ReadAll produces a simple >>>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable >>>>>>>>>>>>>>> between IOs (e.g. the non-SDF >>>>>>>>>>>>>>> >>>>>>>>>> case): >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >>>>>>>>>>>>>>> PTransform<PCollection<Read>, >>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>>>>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>>>>>>>>>>>> expand(PCollection<Read> input) { >>>>>>>>>>>>>>> >>>>>>>>>> return input >>>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >>>>>>>>>>>>>>> Reshuffle.viaRandomKey()) >>>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results >>>>>>>>>>>>>>> ReadAll you must have the >>>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require >>>>>>>>>>>>>>> consistent types from the data >>>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to add >>>>>>>>>>>>>>> extra withCoder method(s) on >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow >>>>>>>>>>>>>>> this ReadAll pattern. RedisIO >>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So >>>>>>>>>>>>>>> I wanted to bring this subject >>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if >>>>>>>>>>>>>>> you see any sort of issues that >>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to >>>>>>>>>>>>>>> start using consistently the >>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read >>>>>>>>>>>>>>> and the readAll() method for new >>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the >>>>>>>>>>>>>>> only remaining inconsistent >>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but >>>>>>>>>>>>>>> apart of this we should be ok). >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO >>>>>>>>>>>>>>> based on SDF is doing something >>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called >>>>>>>>>>>>>>> ReadAll and maybe it is worth to be >>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>>>>>>> >>>>>>>>>> Ismaël >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for >>>>>>>>>>>>>>> cross language. The difference being that the cross language >>>>>>>>>>>>>>> transform >>>>>>>>>>>>>>> would take a well known definition and convert it to the Read >>>>>>>>>>>>>>> transform. A >>>>>>>>>>>>>>> normal user would have a pipeline that would look like: >>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>>>>>>>>> PCollection<Output> >>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>>>>>>>>> PCollection<Output> >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > And in the cross language case this would look like: >>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> >>>>>>>>>>>>>>> PTransform(Convert Row to Read) -> PCollection<Read> -> >>>>>>>>>>>>>>> PTransform(ReadAll) >>>>>>>>>>>>>>> -> PCollection<Output> >>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> >>>>>>>>>>>>>>> PTransform(Convert Row to SourceDescriptor) -> >>>>>>>>>>>>>>> PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>>>>>>>>> PCollection<Output>* >>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) >>>>>>>>>>>>>>> only exists since we haven't solved how to use schemas with >>>>>>>>>>>>>>> language bound >>>>>>>>>>>>>>> types in a cross language way. SchemaCoder isn't portable but >>>>>>>>>>>>>>> RowCoder is >>>>>>>>>>>>>>> which is why the conversion step exists. We could have a >>>>>>>>>>>>>>> solution for this >>>>>>>>>>>>>>> at some point in time. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > My concern with using Read was around: >>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? >>>>>>>>>>>>>>> For example, the Kafka Read implementation allows you to set >>>>>>>>>>>>>>> the key and >>>>>>>>>>>>>>> value deserializers which are also used to dictate the output >>>>>>>>>>>>>>> PCollection >>>>>>>>>>>>>>> type. It also allows you to set how the watermark should be >>>>>>>>>>>>>>> computed. >>>>>>>>>>>>>>> Technically a user may want the watermark computation to be >>>>>>>>>>>>>>> configurable >>>>>>>>>>>>>>> per Read and they may also want an output type which is >>>>>>>>>>>>>>> polymorphic (e.g. >>>>>>>>>>>>>>> PCollection<Serializable>). >>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object >>>>>>>>>>>>>>> modelling concerns. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), >>>>>>>>>>>>>>> was it discovered that some properties became runtime errors or >>>>>>>>>>>>>>> were >>>>>>>>>>>>>>> ignored if they were set? If no, then the code deduplication is >>>>>>>>>>>>>>> likely >>>>>>>>>>>>>>> worth it because we also get a lot of javadoc deduplication, >>>>>>>>>>>>>>> but if yes is >>>>>>>>>>>>>>> this an acceptable user experience? >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a >>>>>>>>>>>>>>> general "PTransform<PCollection<Read>, >>>>>>>>>>>>>>> PCollection<OutputType>>” was to >>>>>>>>>>>>>>> reduce the amount of code duplication and error-prone approach >>>>>>>>>>>>>>> related to >>>>>>>>>>>>>>> this. It makes much sense since usually we have all needed >>>>>>>>>>>>>>> configuration >>>>>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will >>>>>>>>>>>>>>> consist mostly >>>>>>>>>>>>>>> of only Split-Shuffle-Read stages. So this case usually can be >>>>>>>>>>>>>>> unified by >>>>>>>>>>>>>>> using PCollection<Read> as input. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs >>>>>>>>>>>>>>> as cross-language transforms (as Luke described) which seems >>>>>>>>>>>>>>> only partly in >>>>>>>>>>>>>>> common with previous pattern of ReadAll using. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read >>>>>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be >>>>>>>>>>>>>>> more in >>>>>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe >>>>>>>>>>>>>>> ReadAll is >>>>>>>>>>>>>>> not a very suitable name in this case because it will can bring >>>>>>>>>>>>>>> some >>>>>>>>>>>>>>> confusions related to previous pattern of ReadAll uses. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and >>>>>>>>>>>>>>> (4): use the data type that is schema-aware as the input of >>>>>>>>>>>>>>> ReadAll. >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> Thanks for the summary, Cham! >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type >>>>>>>>>>>>>>> that is schema-aware as the input of ReadAll. >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with >>>>>>>>>>>>>>> SDF-like IO. But only having (3) is not enough to solve the >>>>>>>>>>>>>>> problem of >>>>>>>>>>>>>>> using ReadAll in x-lang case. >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of >>>>>>>>>>>>>>> ReadAll should be able to cross language boundaries and have >>>>>>>>>>>>>>> compatibilities of updating/downgrading. After investigating >>>>>>>>>>>>>>> some >>>>>>>>>>>>>>> possibilities(pure java pojo with custom coder, protobuf, >>>>>>>>>>>>>>> row/schema) in >>>>>>>>>>>>>>> Kafka usage, we find that row/schema fits our needs most. Here >>>>>>>>>>>>>>> comes (4). I >>>>>>>>>>>>>>> believe that using Read as input of ReadAll makes sense in some >>>>>>>>>>>>>>> cases, but >>>>>>>>>>>>>>> I also think not all IOs have the same need. I would treat Read >>>>>>>>>>>>>>> as a >>>>>>>>>>>>>>> special type as long as the Read is schema-aware. >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options >>>>>>>>>>>>>>> discussed so far when it comes to defining source descriptors >>>>>>>>>>>>>>> for ReadAll >>>>>>>>>>>>>>> type transforms >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the >>>>>>>>>>>>>>> input PCollection >>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data >>>>>>>>>>>>>>> element of the input PCollection >>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read >>>>>>>>>>>>>>> transform which essentially will convert it to a ReadAll (what >>>>>>>>>>>>>>> Eugene >>>>>>>>>>>>>>> mentioned) >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of >>>>>>>>>>>>>>> source descriptions such as files. >>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make >>>>>>>>>>>>>>> it hard to use the ReadAll transform as a cross-language >>>>>>>>>>>>>>> transform and will >>>>>>>>>>>>>>> break the separation of construction time and runtime constructs >>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but >>>>>>>>>>>>>>> will make the transform easier to be used as a cross-language >>>>>>>>>>>>>>> transform >>>>>>>>>>>>>>> without additional modifications >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms >>>>>>>>>>>>>>> that are more efficient. So we might be able to just define all >>>>>>>>>>>>>>> sources in >>>>>>>>>>>>>>> that format and make Read transforms just an easy to use >>>>>>>>>>>>>>> composite built on >>>>>>>>>>>>>>> top of that (by adding a preceding Create transform). >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> Thanks, >>>>>>>>>>>>>>> >>>> Cham >>>>>>>>>>>>>>> >>>> >>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable >>>>>>>>>>>>>>> since anonymous DoFns typically capture the enclosing >>>>>>>>>>>>>>> PTransform. >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a >>>>>>>>>>>>>>> transform, at least here: >>>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time >>>>>>>>>>>>>>> transforms from execution time data objects that we store in >>>>>>>>>>>>>>> PCollections >>>>>>>>>>>>>>> as Luke mentioned. Also, we don't guarantee that PTransform is >>>>>>>>>>>>>>> serializable >>>>>>>>>>>>>>> so users have the additional complexity of providing a corder >>>>>>>>>>>>>>> whenever a >>>>>>>>>>>>>>> PTransform is used as a data object. >>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java >>>>>>>>>>>>>>> objects that are convertible to Beam Rows allow us to make >>>>>>>>>>>>>>> these transforms >>>>>>>>>>>>>>> available to other SDKs through the cross-language transforms. >>>>>>>>>>>>>>> Using >>>>>>>>>>>>>>> transforms or complex sources as data objects will probably >>>>>>>>>>>>>>> make this >>>>>>>>>>>>>>> difficult. >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>> Cham >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> >>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> Hi Ismael, >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to >>>>>>>>>>>>>>> the IO with SDF implementation despite the type of input, where >>>>>>>>>>>>>>> Read refers >>>>>>>>>>>>>>> to UnboundedSource. One major pushback of using KafkaIO.Read >>>>>>>>>>>>>>> as source >>>>>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are >>>>>>>>>>>>>>> meaningful >>>>>>>>>>>>>>> to populate during execution time. Also when thinking about >>>>>>>>>>>>>>> x-lang useage, >>>>>>>>>>>>>>> making source description across language boundaries is also >>>>>>>>>>>>>>> necessary. As >>>>>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an >>>>>>>>>>>>>>> AutoValue object: >>>>>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this >>>>>>>>>>>>>>> schema-aware object >>>>>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's >>>>>>>>>>>>>>> also easy to >>>>>>>>>>>>>>> convert a Row into the source description: Convert.fromRows. >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> >>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll >>>>>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This >>>>>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable >>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>> for reading from Kafka. This is different from the pattern that >>>>>>>>>>>>>>> Ismael >>>>>>>>>>>>>>> listed because they take PCollection<Read> as input and the >>>>>>>>>>>>>>> Read is the >>>>>>>>>>>>>>> same as the Read PTransform class used for the non read all >>>>>>>>>>>>>>> case. >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication >>>>>>>>>>>>>>> since parameters used to configure the transform have to be >>>>>>>>>>>>>>> copied over to >>>>>>>>>>>>>>> the source descriptor but decouples how a transform is >>>>>>>>>>>>>>> specified from the >>>>>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's >>>>>>>>>>>>>>> point is >>>>>>>>>>>>>>> that we wouldn't need such a decoupling. >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I >>>>>>>>>>>>>>> believe is a non-issue is that the Beam Java SDK has the most >>>>>>>>>>>>>>> IO connectors >>>>>>>>>>>>>>> and we would want to use the IO implementations within Beam Go >>>>>>>>>>>>>>> and Beam >>>>>>>>>>>>>>> Python. This brings in its own set of issues related to >>>>>>>>>>>>>>> versioning and >>>>>>>>>>>>>>> compatibility for the wire format and how one parameterizes such >>>>>>>>>>>>>>> transforms. The wire format issue can be solved with either >>>>>>>>>>>>>>> approach by >>>>>>>>>>>>>>> making sure that the cross language expansion always takes the >>>>>>>>>>>>>>> well known >>>>>>>>>>>>>>> format (whatever it may be) and converts it into >>>>>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to >>>>>>>>>>>>>>> the ReadAll >>>>>>>>>>>>>>> transform. Boyuan has been looking to make the >>>>>>>>>>>>>>> KafkaSourceDescriptor have a >>>>>>>>>>>>>>> schema so it can be represented as a row and this can be done >>>>>>>>>>>>>>> easily using >>>>>>>>>>>>>>> the AutoValue integration (I don't believe there is anything >>>>>>>>>>>>>>> preventing >>>>>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or >>>>>>>>>>>>>>> also using the >>>>>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue). >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and >>>>>>>>>>>>>>> separation of concerns provided by using a different object to >>>>>>>>>>>>>>> represent >>>>>>>>>>>>>>> the contents of the PCollection from the pipeline construction >>>>>>>>>>>>>>> time >>>>>>>>>>>>>>> PTransform. >>>>>>>>>>>>>>> >>>>>>>> >>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael, >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an >>>>>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a >>>>>>>>>>>>>>> sense also have >>>>>>>>>>>>>>> to configure a dynamic number different IO transforms of the >>>>>>>>>>>>>>> same type >>>>>>>>>>>>>>> (file writes)? >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects >>>>>>>>>>>>>>> of many file writes: >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>>>>>>>>>>>> Transaction>writeDynamic() >>>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >>>>>>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // >>>>>>>>>>>>>>> Convert the data to be written to CSVSink >>>>>>>>>>>>>>> >>>>>>>>> type -> new >>>>>>>>>>>>>>> CSVSink(type.getFieldNames())) >>>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >>>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>>>>>>>>>>>> "-transactions", ".csv")); >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO >>>>>>>>>>>>>>> reads: >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from >>>>>>>>>>>>>>> which all the read parameters can be inferred >>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>>>>>>>>>>>>>> Moo>readAll() >>>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this >>>>>>>>>>>>>>> bar...) >>>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for >>>>>>>>>>>>>>> this bar...) >>>>>>>>>>>>>>> >>>>>>>>> ...etc); >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires >>>>>>>>>>>>>>> context) >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn >>>>>>>>>>>>>>> based ones. One pattern >>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The >>>>>>>>>>>>>>> idea is to have a different >>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a >>>>>>>>>>>>>>> PCollection of different sorts of >>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, >>>>>>>>>>>>>>> etc, for example: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >>>>>>>>>>>>>>> PCollection<OutputT>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> RedisIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>>>>>>>>>> PCollection<KV<String, String>>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends >>>>>>>>>>>>>>> PTransform<PCollection<HBaseQuery>, PCollection<Result>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like >>>>>>>>>>>>>>> doing multiple queries in the same >>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or >>>>>>>>>>>>>>> querying from multiple tables at the >>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll >>>>>>>>>>>>>>> transforms the parameters for >>>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>>>>>>>>>>>> duplicated with methods and >>>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into >>>>>>>>>>>>>>> the ReadAll transforms. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to >>>>>>>>>>>>>>> expand the input parameters of the >>>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into something that >>>>>>>>>>>>>>> resembles the full `Read` >>>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to read >>>>>>>>>>>>>>> from multiple tables or >>>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this >>>>>>>>>>>>>>> was not in the intermediate >>>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra >>>>>>>>>>>>>>> methods (duplicating more code) >>>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read full >>>>>>>>>>>>>>> spec. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method >>>>>>>>>>>>>>> we end up adding them >>>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so >>>>>>>>>>>>>>> they are taken into account. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to >>>>>>>>>>>>>>> test a new approach that is >>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code >>>>>>>>>>>>>>> became: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>>>>>>>>>>>> PCollection<Result>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of >>>>>>>>>>>>>>> improvements on parameters of normal >>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read >>>>>>>>>>>>>>> parameters. But of course there are >>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats: >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal >>>>>>>>>>>>>>> Reads for example >>>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or >>>>>>>>>>>>>>> Restriction information (in the SDF >>>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent approach >>>>>>>>>>>>>>> of ReadAll produces a simple >>>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable >>>>>>>>>>>>>>> between IOs (e.g. the non-SDF >>>>>>>>>>>>>>> >>>>>>>>>> case): >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >>>>>>>>>>>>>>> PTransform<PCollection<Read>, >>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>>>>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>>>>>>>>>>>> expand(PCollection<Read> input) { >>>>>>>>>>>>>>> >>>>>>>>>> return input >>>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >>>>>>>>>>>>>>> Reshuffle.viaRandomKey()) >>>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results >>>>>>>>>>>>>>> ReadAll you must have the >>>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require >>>>>>>>>>>>>>> consistent types from the data >>>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to add >>>>>>>>>>>>>>> extra withCoder method(s) on >>>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow >>>>>>>>>>>>>>> this ReadAll pattern. RedisIO >>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So >>>>>>>>>>>>>>> I wanted to bring this subject >>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if >>>>>>>>>>>>>>> you see any sort of issues that >>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to >>>>>>>>>>>>>>> start using consistently the >>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read >>>>>>>>>>>>>>> and the readAll() method for new >>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the >>>>>>>>>>>>>>> only remaining inconsistent >>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but >>>>>>>>>>>>>>> apart of this we should be ok). >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO >>>>>>>>>>>>>>> based on SDF is doing something >>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called >>>>>>>>>>>>>>> ReadAll and maybe it is worth to be >>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>>>>>>> >>>>>>>>>> Ismaël >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>
