I don't think an enum of most common closures will work. The input types are typically generics that are made concrete by the caller who also provides the closures. I think Luke's (2) is the same idea as my "Java still assembles it [using opaque Python closures/transforms]". It seems like an approach to (3). Passing over actual code could address some cases, but libraries become the issue.
I think it is fair to say that "WriteAll" style would involve entering unexplored territory. On the main topic, I think Brian has a pretty strong point and his example of type conversion lambdas is a good example. I did a quick survey and every other property I could find does seem like it fits on the Read, and most IOs have a few of these closures for example also extracting timestamps. So maybe just a resolution convention of putting them on the ReadAll and that taking precedence. Then you would be deserializing a Read transform with insta-crash methods or some such? Kenn On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov <[email protected]> wrote: > Yeah, mainly I just feel like dynamic reads and dynamic writes (and > perhaps not-yet-invented similar transforms of other kinds) are tightly > related - they are either very similar, or are duals of each other - so > they should use the same approach. If they are using different approaches, > it is a sign that either one of them is being done wrong or that we are > running into a fundamental limitation of Beam (e.g. difficulty of encoding > closures compared to encoding elements). > > But I agree with Luke that we shouldn't give up on closures. Especially > with the work that has been done on schemas and SQL, I see no reason why we > couldn't express closures in a portable restricted sub-language. If we can > express SQL, we can express many or most use cases of dynamic reads/writes > - I don't mean that we should actually use SQL (though we *could* - e.g. > SQL scalar expressions seem powerful enough to express the closures > appearing in most use cases of FileIO.writeDynamic), I just mean that SQL > is an existence proof. > > (I don't want to rock the boat too much, just thought I'd chime in as this > topic is dear to my heart) > > On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> wrote: > >> Kenn, I'm not too worried about closures since: >> 1) the expansion service for a transform could have a well set of defined >> closures by name that are returned as serialized objects that don't need to >> be interpretable by the caller >> 2) the language could store serialized functions of another language as >> constants >> 3) generic XLang function support will eventually be needed >> but I do agree that closures do make things difficult to express vs data >> which is why primarily why we should prefer data over closures when >> possible and use closures when expressing it with data would be too >> cumbersome. >> >> Brian, so far the cases that have been migrated have shown that the >> source descriptor and the Read transform are almost the same (some >> parameters that only impact pipeline construction such as coders differ). >> >> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette <[email protected]> >> wrote: >> >>> Sorry for jumping into this late and casting a vote against the >>> consensus... but I think I'd prefer standardizing on a pattern like >>> PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. That >>> approach clearly separates the parameters that are allowed to vary across a >>> ReadAll (the ones defined in KafkaSourceDescriptor) from the parameters >>> that should be constant (other parameters in the Read object, like >>> SerializedFunctions for type conversions, parameters for different >>> operating modes, etc...). I think it's helpful to think of the parameters >>> that are allowed to vary as some "location descriptor", but I imagine IO >>> authors may want other parameters to vary across a ReadAll as well. >>> >>> To me it seems safer to let an IO author "opt-in" to a parameter being >>> dynamic at execution time. >>> >>> Brian >>> >>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> I'd like to raise one more time the question of consistency between >>>> dynamic reads and dynamic writes, per my email at the beginning of the >>>> thread. >>>> If the community prefers ReadAll to read from Read, then should >>>> dynamicWrite's write to Write? >>>> >>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> It seems like most of us agree on the idea that ReadAll should read >>>>> from Read. I'm going to update the Kafka ReadAll with the same pattern. >>>>> Thanks for all your help! >>>>> >>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]> wrote: >>>>>> >>>>>>> I would also like to suggest that transforms that implement ReadAll >>>>>>> via Read should also provide methods like: >>>>>>> >>>>>>> // Uses the specified values if unspecified in the input element >>>>>>> from the PCollection<Read>. >>>>>>> withDefaults(Read read); >>>>>>> // Uses the specified values regardless of what the input element >>>>>>> from the PCollection<Read> specifies. >>>>>>> withOverrides(Read read); >>>>>>> >>>>>>> and only adds methods that are required at construction time (e.g. >>>>>>> coders). This way the majority of documentation sits on the Read >>>>>>> transform. >>>>>>> >>>>>> >>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here and some >>>>>> of the drawbacks related to cross-language can be overcome through future >>>>>> advancements. >>>>>> Thanks for bringing this up Ismaël. >>>>>> >>>>>> - Cham >>>>>> >>>>>> >>>>>>> >>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> wrote: >>>>>>> >>>>>>>> Ismael, it is good to hear that using Read as the input didn't have >>>>>>>> a bunch of parameters that were being skipped/ignored. Also, for the >>>>>>>> polymorphism issue you have to rely on the user correctly telling you >>>>>>>> the >>>>>>>> type in such a way where it is a common ancestor of all the runtime >>>>>>>> types >>>>>>>> that will ever be used. This usually boils down to something like >>>>>>>> Serializable or DynamicMessage such that the coder that is chosen >>>>>>>> works for >>>>>>>> all the runtime types. Using multiple types is a valid use case and >>>>>>>> would >>>>>>>> allow for a simpler graph with less flattens merging the output from >>>>>>>> multiple sources. >>>>>>>> >>>>>>>> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read >>>>>>>> which uses schemas even if some of the parameters can't be represented >>>>>>>> in a >>>>>>>> meaningful way beyond "bytes". This would be helpful for cross >>>>>>>> language as >>>>>>>> well since every parameter would become available if a language could >>>>>>>> support it (e.g. it could serialize a java function up front and keep >>>>>>>> it >>>>>>>> saved as raw bytes within said language). Even if we figure out a >>>>>>>> better >>>>>>>> way to do this in the future, we'll have to change the schema for the >>>>>>>> new >>>>>>>> way anyway. This would mean that the external version of the transform >>>>>>>> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion >>>>>>>> from >>>>>>>> Row to Read could validate that the parameters make sense (e.g. the >>>>>>>> bytes >>>>>>>> are valid serialized functions). The addition of an >>>>>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and >>>>>>>> this would enable having a bounded version that could be used for >>>>>>>> backfills >>>>>>>> (this doesn't have to be done as part of any current ongoing PR). >>>>>>>> Essentially any parameter that could be added for a single instance of >>>>>>>> a >>>>>>>> Kafka element+restriction would also make sense to the KafkaIO.Read >>>>>>>> transform since it too is a single instance. There are parameters that >>>>>>>> would apply to the ReadAll that wouldn't apply to a read and these >>>>>>>> would be >>>>>>>> global parameters across all element+restriction pairs such as config >>>>>>>> overrides or default values. >>>>>>>> >>>>>>>> I am convinced that we should do as Ismael is suggesting and use >>>>>>>> KafkaIO.Read as the type. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Discussion regarding cross-language transforms is a slight tangent >>>>>>>>> here. But I think, in general, it's great if we can use existing >>>>>>>>> transforms >>>>>>>>> (for example, IO connectors) as cross-language transforms without >>>>>>>>> having to >>>>>>>>> build more composites (irrespective of whether in >>>>>>>>> ExternalTransformBuilders >>>>>>>>> or a user pipelines) just to make them cross-language compatible. A >>>>>>>>> future >>>>>>>>> cross-language compatible SchemaCoder might help (assuming that works >>>>>>>>> for >>>>>>>>> Read transform) but I'm not sure we have a good idea when we'll get >>>>>>>>> to that >>>>>>>>> state. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Cham >>>>>>>>> >>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> For unbounded SDF in Kafka, we also consider the >>>>>>>>>> upgrading/downgrading compatibility in the pipeline update >>>>>>>>>> scenario(For >>>>>>>>>> detailed discussion, please refer to >>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). >>>>>>>>>> In order to obtain the compatibility, it requires the input of the >>>>>>>>>> read SDF >>>>>>>>>> is schema-aware. >>>>>>>>>> >>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to >>>>>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware, >>>>>>>>>> otherwise pipeline updates might fail unnecessarily. If looking into >>>>>>>>>> KafkaIO.Read, not all necessary fields are compatible with schema, >>>>>>>>>> for >>>>>>>>>> example, SerializedFunction. >>>>>>>>>> >>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common >>>>>>>>>> pattern for SDF based IO. The Read can be a common pattern because >>>>>>>>>> the >>>>>>>>>> input is always a PBegin. But for an SDF based IO, the input can be >>>>>>>>>> anything. By using Read as input, we will still have the maintenance >>>>>>>>>> cost >>>>>>>>>> when SDF IO supports a new field but Read doesn't consume it. For >>>>>>>>>> example, >>>>>>>>>> we are discussing adding endOffset and endReadTime to >>>>>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read. >>>>>>>>>> >>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> We forgot to mention (5) External.Config used in cross-lang, see >>>>>>>>>>> KafkaIO >>>>>>>>>>> ExternalTransformBuilder. This approach is the predecessor of >>>>>>>>>>> (4) and probably a >>>>>>>>>>> really good candidate to be replaced by the Row based >>>>>>>>>>> Configuration Boyuan is >>>>>>>>>>> envisioning (so good to be aware of this). >>>>>>>>>>> >>>>>>>>>>> Thanks for the clear explanation Luke you mention the real >>>>>>>>>>> issue(s). All the >>>>>>>>>>> approaches discussed so far in the end could be easily >>>>>>>>>>> transformed to produce a >>>>>>>>>>> PCollection<Read> and those Read Elements could be read by the >>>>>>>>>>> generic ReadAll >>>>>>>>>>> transform. Notice that this can be internal in some IOs e.g. >>>>>>>>>>> KafkaIO if they >>>>>>>>>>> decide not to expose it. I am not saying that we should force >>>>>>>>>>> every IO to >>>>>>>>>>> support ReadAll in its public API but if we do it is probably a >>>>>>>>>>> good idea to be >>>>>>>>>>> consistent with naming the transform that expects an input >>>>>>>>>>> PCollection<Read> in >>>>>>>>>>> the same way. Also notice that using it will save us of the >>>>>>>>>>> maintenance issues >>>>>>>>>>> discussed in my previous email. >>>>>>>>>>> >>>>>>>>>>> Back to the main concern: the consequences of expansion based on >>>>>>>>>>> Read: So far I >>>>>>>>>>> have not seen consequences for the Splitting part which maps >>>>>>>>>>> really nice >>>>>>>>>>> assuming the Partition info / Restriction is available as part >>>>>>>>>>> of Read. So far >>>>>>>>>>> there are not Serialization because Beam is already enforcing >>>>>>>>>>> this. Notice that >>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at >>>>>>>>>>> least for the >>>>>>>>>>> Bounded case (see the code in my previous email). For the other >>>>>>>>>>> points: >>>>>>>>>>> >>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>>>>>>>> example, the >>>>>>>>>>> > Kafka Read implementation allows you to set the key and value >>>>>>>>>>> deserializers >>>>>>>>>>> > which are also used to dictate the output PCollection type. It >>>>>>>>>>> also allows you >>>>>>>>>>> > to set how the watermark should be computed. Technically a >>>>>>>>>>> user may want the >>>>>>>>>>> > watermark computation to be configurable per Read and they may >>>>>>>>>>> also want an >>>>>>>>>>> > output type which is polymorphic (e.g. >>>>>>>>>>> Pcollection<Serializable>). >>>>>>>>>>> >>>>>>>>>>> Most of the times they do but for parametric types we cannot >>>>>>>>>>> support different >>>>>>>>>>> types in the outputs of the Read or at least I did not find how >>>>>>>>>>> to do so (is >>>>>>>>>>> there a way to use multiple output Coders on Beam?), we saw this >>>>>>>>>>> in CassandraIO >>>>>>>>>>> and we were discussing adding explicitly these Coders or >>>>>>>>>>> Serializer >>>>>>>>>>> specific methods to the ReadAll transform. This is less nice >>>>>>>>>>> because it will >>>>>>>>>>> imply some repeated methods, but it is still a compromise to >>>>>>>>>>> gain the other >>>>>>>>>>> advantages. I suppose the watermark case you mention is similar >>>>>>>>>>> because you may >>>>>>>>>>> want the watermark to behave differently in each Read and we >>>>>>>>>>> probably don’t >>>>>>>>>>> support this, so it corresponds to the polymorphic category. >>>>>>>>>>> >>>>>>>>>>> > b) Read extends PTransform which brings its own object >>>>>>>>>>> modelling concerns. >>>>>>>>>>> >>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was >>>>>>>>>>> it discovered >>>>>>>>>>> > that some properties became runtime errors or were ignored if >>>>>>>>>>> they were set? >>>>>>>>>>> > If no, then the code deduplication is likely worth it because >>>>>>>>>>> we also get a >>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an acceptable >>>>>>>>>>> user >>>>>>>>>>> > experience? >>>>>>>>>>> >>>>>>>>>>> No, not so far. This is an interesting part, notice that the >>>>>>>>>>> Read translation >>>>>>>>>>> ends up delegating the read bits to the ReadFn part of ReadAll >>>>>>>>>>> so the ReadFn is >>>>>>>>>>> the real read and must be aware and use all the parameters. >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public PCollection<SolrDocument> expand(PBegin input) { >>>>>>>>>>> return input.apply("Create", >>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll()); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> I might be missing something for the Unbounded SDF case which is >>>>>>>>>>> the only case >>>>>>>>>>> we have not explored so far. I think one easy way to see the >>>>>>>>>>> limitations would >>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to map >>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the >>>>>>>>>>> Read logic on >>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit, the >>>>>>>>>>> polymorphic ones >>>>>>>>>>> will be there for sure, maybe others will appear (not sure). >>>>>>>>>>> However it would be >>>>>>>>>>> interesting to see if we have a real gain in the maintenance >>>>>>>>>>> points, but well >>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so >>>>>>>>>>> probably the generic >>>>>>>>>>> implementation could be relatively complex. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross >>>>>>>>>>> language. The difference being that the cross language transform >>>>>>>>>>> would take >>>>>>>>>>> a well known definition and convert it to the Read transform. A >>>>>>>>>>> normal user >>>>>>>>>>> would have a pipeline that would look like: >>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>>>>> PCollection<Output> >>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>>>>> PCollection<Output> >>>>>>>>>>> > >>>>>>>>>>> > And in the cross language case this would look like: >>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert >>>>>>>>>>> Row to Read) -> PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>>>>> PCollection<Output> >>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert >>>>>>>>>>> Row to SourceDescriptor) -> PCollection<SourceDescriptor> -> >>>>>>>>>>> PTransform(ReadAll) -> PCollection<Output>* >>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only >>>>>>>>>>> exists since we haven't solved how to use schemas with language >>>>>>>>>>> bound types >>>>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is >>>>>>>>>>> which >>>>>>>>>>> is why the conversion step exists. We could have a solution for >>>>>>>>>>> this at >>>>>>>>>>> some point in time. >>>>>>>>>>> > >>>>>>>>>>> > My concern with using Read was around: >>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>>>>>>>> example, the Kafka Read implementation allows you to set the key >>>>>>>>>>> and value >>>>>>>>>>> deserializers which are also used to dictate the output PCollection >>>>>>>>>>> type. >>>>>>>>>>> It also allows you to set how the watermark should be computed. >>>>>>>>>>> Technically >>>>>>>>>>> a user may want the watermark computation to be configurable per >>>>>>>>>>> Read and >>>>>>>>>>> they may also want an output type which is polymorphic (e.g. >>>>>>>>>>> PCollection<Serializable>). >>>>>>>>>>> > b) Read extends PTransform which brings its own object >>>>>>>>>>> modelling concerns. >>>>>>>>>>> > >>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was >>>>>>>>>>> it discovered that some properties became runtime errors or were >>>>>>>>>>> ignored if >>>>>>>>>>> they were set? If no, then the code deduplication is likely worth it >>>>>>>>>>> because we also get a lot of javadoc deduplication, but if yes is >>>>>>>>>>> this an >>>>>>>>>>> acceptable user experience? >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a >>>>>>>>>>> general "PTransform<PCollection<Read>, PCollection<OutputType>>” >>>>>>>>>>> was to >>>>>>>>>>> reduce the amount of code duplication and error-prone approach >>>>>>>>>>> related to >>>>>>>>>>> this. It makes much sense since usually we have all needed >>>>>>>>>>> configuration >>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will consist >>>>>>>>>>> mostly >>>>>>>>>>> of only Split-Shuffle-Read stages. So this case usually can be >>>>>>>>>>> unified by >>>>>>>>>>> using PCollection<Read> as input. >>>>>>>>>>> >> >>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as >>>>>>>>>>> cross-language transforms (as Luke described) which seems only >>>>>>>>>>> partly in >>>>>>>>>>> common with previous pattern of ReadAll using. >>>>>>>>>>> >> >>>>>>>>>>> >> I’d be more in favour to have only one concept of read >>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be more >>>>>>>>>>> in >>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe >>>>>>>>>>> ReadAll is >>>>>>>>>>> not a very suitable name in this case because it will can bring some >>>>>>>>>>> confusions related to previous pattern of ReadAll uses. >>>>>>>>>>> >> >>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and >>>>>>>>>>> (4): use the data type that is schema-aware as the input of ReadAll. >>>>>>>>>>> >> >>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> Thanks for the summary, Cham! >>>>>>>>>>> >>> >>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type that >>>>>>>>>>> is schema-aware as the input of ReadAll. >>>>>>>>>>> >>> >>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like >>>>>>>>>>> IO. But only having (3) is not enough to solve the problem of using >>>>>>>>>>> ReadAll in x-lang case. >>>>>>>>>>> >>> >>>>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll >>>>>>>>>>> should be able to cross language boundaries and have >>>>>>>>>>> compatibilities of >>>>>>>>>>> updating/downgrading. After investigating some possibilities(pure >>>>>>>>>>> java pojo >>>>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find >>>>>>>>>>> that >>>>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that >>>>>>>>>>> using Read >>>>>>>>>>> as input of ReadAll makes sense in some cases, but I also think not >>>>>>>>>>> all IOs >>>>>>>>>>> have the same need. I would treat Read as a special type as long as >>>>>>>>>>> the >>>>>>>>>>> Read is schema-aware. >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> I see. So it seems like there are three options discussed >>>>>>>>>>> so far when it comes to defining source descriptors for ReadAll type >>>>>>>>>>> transforms >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input >>>>>>>>>>> PCollection >>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data >>>>>>>>>>> element of the input PCollection >>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read transform >>>>>>>>>>> which essentially will convert it to a ReadAll (what Eugene >>>>>>>>>>> mentioned) >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of >>>>>>>>>>> source descriptions such as files. >>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it >>>>>>>>>>> hard to use the ReadAll transform as a cross-language transform and >>>>>>>>>>> will >>>>>>>>>>> break the separation of construction time and runtime constructs >>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but will >>>>>>>>>>> make the transform easier to be used as a cross-language transform >>>>>>>>>>> without >>>>>>>>>>> additional modifications >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that >>>>>>>>>>> are more efficient. So we might be able to just define all sources >>>>>>>>>>> in that >>>>>>>>>>> format and make Read transforms just an easy to use composite built >>>>>>>>>>> on top >>>>>>>>>>> of that (by adding a preceding Create transform). >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Thanks, >>>>>>>>>>> >>>> Cham >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable >>>>>>>>>>> since anonymous DoFns typically capture the enclosing PTransform. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a >>>>>>>>>>> transform, at least here: >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> I'm in favour of separating construction time transforms >>>>>>>>>>> from execution time data objects that we store in PCollections as >>>>>>>>>>> Luke >>>>>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable >>>>>>>>>>> so >>>>>>>>>>> users have the additional complexity of providing a corder whenever >>>>>>>>>>> a >>>>>>>>>>> PTransform is used as a data object. >>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects >>>>>>>>>>> that are convertible to Beam Rows allow us to make these transforms >>>>>>>>>>> available to other SDKs through the cross-language transforms. Using >>>>>>>>>>> transforms or complex sources as data objects will probably make >>>>>>>>>>> this >>>>>>>>>>> difficult. >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> Thanks, >>>>>>>>>>> >>>>>> Cham >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> Hi Ismael, >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO >>>>>>>>>>> with SDF implementation despite the type of input, where Read >>>>>>>>>>> refers to >>>>>>>>>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>>>>>>>>> description is that not all configurations of KafkaIO.Read are >>>>>>>>>>> meaningful >>>>>>>>>>> to populate during execution time. Also when thinking about x-lang >>>>>>>>>>> useage, >>>>>>>>>>> making source description across language boundaries is also >>>>>>>>>>> necessary. As >>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue >>>>>>>>>>> object: >>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware >>>>>>>>>>> object >>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also >>>>>>>>>>> easy to >>>>>>>>>>> convert a Row into the source description: Convert.fromRows. >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll >>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This >>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable >>>>>>>>>>> parameters >>>>>>>>>>> for reading from Kafka. This is different from the pattern that >>>>>>>>>>> Ismael >>>>>>>>>>> listed because they take PCollection<Read> as input and the Read is >>>>>>>>>>> the >>>>>>>>>>> same as the Read PTransform class used for the non read all case. >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication >>>>>>>>>>> since parameters used to configure the transform have to be copied >>>>>>>>>>> over to >>>>>>>>>>> the source descriptor but decouples how a transform is specified >>>>>>>>>>> from the >>>>>>>>>>> object that describes what needs to be done. I believe Ismael's >>>>>>>>>>> point is >>>>>>>>>>> that we wouldn't need such a decoupling. >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe >>>>>>>>>>> is a non-issue is that the Beam Java SDK has the most IO connectors >>>>>>>>>>> and we >>>>>>>>>>> would want to use the IO implementations within Beam Go and Beam >>>>>>>>>>> Python. >>>>>>>>>>> This brings in its own set of issues related to versioning and >>>>>>>>>>> compatibility for the wire format and how one parameterizes such >>>>>>>>>>> transforms. The wire format issue can be solved with either >>>>>>>>>>> approach by >>>>>>>>>>> making sure that the cross language expansion always takes the well >>>>>>>>>>> known >>>>>>>>>>> format (whatever it may be) and converts it into >>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the >>>>>>>>>>> ReadAll >>>>>>>>>>> transform. Boyuan has been looking to make the >>>>>>>>>>> KafkaSourceDescriptor have a >>>>>>>>>>> schema so it can be represented as a row and this can be done >>>>>>>>>>> easily using >>>>>>>>>>> the AutoValue integration (I don't believe there is anything >>>>>>>>>>> preventing >>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or also >>>>>>>>>>> using the >>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue). >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> I would be more for the code duplication and separation >>>>>>>>>>> of concerns provided by using a different object to represent the >>>>>>>>>>> contents >>>>>>>>>>> of the PCollection from the pipeline construction time PTransform. >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> Hi Ismael, >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an >>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense >>>>>>>>>>> also have >>>>>>>>>>> to configure a dynamic number different IO transforms of the same >>>>>>>>>>> type >>>>>>>>>>> (file writes)? >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of >>>>>>>>>>> many file writes: >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>>>>>>>> Transaction>writeDynamic() >>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert >>>>>>>>>>> the data to be written to CSVSink >>>>>>>>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>>>>>>>> "-transactions", ".csv")); >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from >>>>>>>>>>> which all the read parameters can be inferred >>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>>>>>>>>>> Moo>readAll() >>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this >>>>>>>>>>> bar...) >>>>>>>>>>> >>>>>>>>> ...etc); >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires >>>>>>>>>>> context) >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn >>>>>>>>>>> based ones. One pattern >>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The >>>>>>>>>>> idea is to have a different >>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection >>>>>>>>>>> of different sorts of >>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, >>>>>>>>>>> etc, for example: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> JdbcIO: >>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >>>>>>>>>>> PCollection<OutputT>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> RedisIO: >>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>>>>>> PCollection<KV<String, String>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>>>>>>>> PCollection<Result>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing >>>>>>>>>>> multiple queries in the same >>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying >>>>>>>>>>> from multiple tables at the >>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll >>>>>>>>>>> transforms the parameters for >>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>>>>>>>> duplicated with methods and >>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into the >>>>>>>>>>> ReadAll transforms. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand >>>>>>>>>>> the input parameters of the >>>>>>>>>>> >>>>>>>>>> intermediary specification into something that >>>>>>>>>>> resembles the full `Read` >>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to read >>>>>>>>>>> from multiple tables or >>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this was >>>>>>>>>>> not in the intermediate >>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra methods >>>>>>>>>>> (duplicating more code) >>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we >>>>>>>>>>> end up adding them >>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so they >>>>>>>>>>> are taken into account. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a >>>>>>>>>>> new approach that is >>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code >>>>>>>>>>> became: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>>>>>>>> PCollection<Result>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of >>>>>>>>>>> improvements on parameters of normal >>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read >>>>>>>>>>> parameters. But of course there are >>>>>>>>>>> >>>>>>>>>> some minor caveats: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal >>>>>>>>>>> Reads for example >>>>>>>>>>> >>>>>>>>>> partition boundaries information or Restriction >>>>>>>>>>> information (in the SDF >>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent approach of >>>>>>>>>>> ReadAll produces a simple >>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable between >>>>>>>>>>> IOs (e.g. the non-SDF >>>>>>>>>>> >>>>>>>>>> case): >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >>>>>>>>>>> PTransform<PCollection<Read>, >>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>>>>>>>> expand(PCollection<Read> input) { >>>>>>>>>>> >>>>>>>>>> return input >>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >>>>>>>>>>> Reshuffle.viaRandomKey()) >>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results >>>>>>>>>>> ReadAll you must have the >>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require >>>>>>>>>>> consistent types from the data >>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to add >>>>>>>>>>> extra withCoder method(s) on >>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this >>>>>>>>>>> ReadAll pattern. RedisIO >>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I >>>>>>>>>>> wanted to bring this subject >>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you >>>>>>>>>>> see any sort of issues that >>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to >>>>>>>>>>> start using consistently the >>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and >>>>>>>>>>> the readAll() method for new >>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>>>>>>>>> remaining inconsistent >>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of >>>>>>>>>>> this we should be ok). >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based >>>>>>>>>>> on SDF is doing something >>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll >>>>>>>>>>> and maybe it is worth to be >>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>> Ismaël >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross >>>>>>>>>>> language. The difference being that the cross language transform >>>>>>>>>>> would take >>>>>>>>>>> a well known definition and convert it to the Read transform. A >>>>>>>>>>> normal user >>>>>>>>>>> would have a pipeline that would look like: >>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>>>>> PCollection<Output> >>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>>>>> PCollection<Output> >>>>>>>>>>> > >>>>>>>>>>> > And in the cross language case this would look like: >>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert >>>>>>>>>>> Row to Read) -> PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>>>>> PCollection<Output> >>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert >>>>>>>>>>> Row to SourceDescriptor) -> PCollection<SourceDescriptor> -> >>>>>>>>>>> PTransform(ReadAll) -> PCollection<Output>* >>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only >>>>>>>>>>> exists since we haven't solved how to use schemas with language >>>>>>>>>>> bound types >>>>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is >>>>>>>>>>> which >>>>>>>>>>> is why the conversion step exists. We could have a solution for >>>>>>>>>>> this at >>>>>>>>>>> some point in time. >>>>>>>>>>> > >>>>>>>>>>> > My concern with using Read was around: >>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>>>>>>>> example, the Kafka Read implementation allows you to set the key >>>>>>>>>>> and value >>>>>>>>>>> deserializers which are also used to dictate the output PCollection >>>>>>>>>>> type. >>>>>>>>>>> It also allows you to set how the watermark should be computed. >>>>>>>>>>> Technically >>>>>>>>>>> a user may want the watermark computation to be configurable per >>>>>>>>>>> Read and >>>>>>>>>>> they may also want an output type which is polymorphic (e.g. >>>>>>>>>>> PCollection<Serializable>). >>>>>>>>>>> > b) Read extends PTransform which brings its own object >>>>>>>>>>> modelling concerns. >>>>>>>>>>> > >>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was >>>>>>>>>>> it discovered that some properties became runtime errors or were >>>>>>>>>>> ignored if >>>>>>>>>>> they were set? If no, then the code deduplication is likely worth it >>>>>>>>>>> because we also get a lot of javadoc deduplication, but if yes is >>>>>>>>>>> this an >>>>>>>>>>> acceptable user experience? >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a >>>>>>>>>>> general "PTransform<PCollection<Read>, PCollection<OutputType>>” >>>>>>>>>>> was to >>>>>>>>>>> reduce the amount of code duplication and error-prone approach >>>>>>>>>>> related to >>>>>>>>>>> this. It makes much sense since usually we have all needed >>>>>>>>>>> configuration >>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will consist >>>>>>>>>>> mostly >>>>>>>>>>> of only Split-Shuffle-Read stages. So this case usually can be >>>>>>>>>>> unified by >>>>>>>>>>> using PCollection<Read> as input. >>>>>>>>>>> >> >>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as >>>>>>>>>>> cross-language transforms (as Luke described) which seems only >>>>>>>>>>> partly in >>>>>>>>>>> common with previous pattern of ReadAll using. >>>>>>>>>>> >> >>>>>>>>>>> >> I’d be more in favour to have only one concept of read >>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be more >>>>>>>>>>> in >>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe >>>>>>>>>>> ReadAll is >>>>>>>>>>> not a very suitable name in this case because it will can bring some >>>>>>>>>>> confusions related to previous pattern of ReadAll uses. >>>>>>>>>>> >> >>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and >>>>>>>>>>> (4): use the data type that is schema-aware as the input of ReadAll. >>>>>>>>>>> >> >>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>> >>>>>>>>>>> >>> Thanks for the summary, Cham! >>>>>>>>>>> >>> >>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type that >>>>>>>>>>> is schema-aware as the input of ReadAll. >>>>>>>>>>> >>> >>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like >>>>>>>>>>> IO. But only having (3) is not enough to solve the problem of using >>>>>>>>>>> ReadAll in x-lang case. >>>>>>>>>>> >>> >>>>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll >>>>>>>>>>> should be able to cross language boundaries and have >>>>>>>>>>> compatibilities of >>>>>>>>>>> updating/downgrading. After investigating some possibilities(pure >>>>>>>>>>> java pojo >>>>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find >>>>>>>>>>> that >>>>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that >>>>>>>>>>> using Read >>>>>>>>>>> as input of ReadAll makes sense in some cases, but I also think not >>>>>>>>>>> all IOs >>>>>>>>>>> have the same need. I would treat Read as a special type as long as >>>>>>>>>>> the >>>>>>>>>>> Read is schema-aware. >>>>>>>>>>> >>> >>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> I see. So it seems like there are three options discussed >>>>>>>>>>> so far when it comes to defining source descriptors for ReadAll type >>>>>>>>>>> transforms >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input >>>>>>>>>>> PCollection >>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data >>>>>>>>>>> element of the input PCollection >>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read transform >>>>>>>>>>> which essentially will convert it to a ReadAll (what Eugene >>>>>>>>>>> mentioned) >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of >>>>>>>>>>> source descriptions such as files. >>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it >>>>>>>>>>> hard to use the ReadAll transform as a cross-language transform and >>>>>>>>>>> will >>>>>>>>>>> break the separation of construction time and runtime constructs >>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but will >>>>>>>>>>> make the transform easier to be used as a cross-language transform >>>>>>>>>>> without >>>>>>>>>>> additional modifications >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that >>>>>>>>>>> are more efficient. So we might be able to just define all sources >>>>>>>>>>> in that >>>>>>>>>>> format and make Read transforms just an easy to use composite built >>>>>>>>>>> on top >>>>>>>>>>> of that (by adding a preceding Create transform). >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> Thanks, >>>>>>>>>>> >>>> Cham >>>>>>>>>>> >>>> >>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable >>>>>>>>>>> since anonymous DoFns typically capture the enclosing PTransform. >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a >>>>>>>>>>> transform, at least here: >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> I'm in favour of separating construction time transforms >>>>>>>>>>> from execution time data objects that we store in PCollections as >>>>>>>>>>> Luke >>>>>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable >>>>>>>>>>> so >>>>>>>>>>> users have the additional complexity of providing a corder whenever >>>>>>>>>>> a >>>>>>>>>>> PTransform is used as a data object. >>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects >>>>>>>>>>> that are convertible to Beam Rows allow us to make these transforms >>>>>>>>>>> available to other SDKs through the cross-language transforms. Using >>>>>>>>>>> transforms or complex sources as data objects will probably make >>>>>>>>>>> this >>>>>>>>>>> difficult. >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> Thanks, >>>>>>>>>>> >>>>>> Cham >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> >>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> Hi Ismael, >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO >>>>>>>>>>> with SDF implementation despite the type of input, where Read >>>>>>>>>>> refers to >>>>>>>>>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>>>>>>>>> description is that not all configurations of KafkaIO.Read are >>>>>>>>>>> meaningful >>>>>>>>>>> to populate during execution time. Also when thinking about x-lang >>>>>>>>>>> useage, >>>>>>>>>>> making source description across language boundaries is also >>>>>>>>>>> necessary. As >>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue >>>>>>>>>>> object: >>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware >>>>>>>>>>> object >>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also >>>>>>>>>>> easy to >>>>>>>>>>> convert a Row into the source description: Convert.fromRows. >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> >>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll >>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This >>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable >>>>>>>>>>> parameters >>>>>>>>>>> for reading from Kafka. This is different from the pattern that >>>>>>>>>>> Ismael >>>>>>>>>>> listed because they take PCollection<Read> as input and the Read is >>>>>>>>>>> the >>>>>>>>>>> same as the Read PTransform class used for the non read all case. >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication >>>>>>>>>>> since parameters used to configure the transform have to be copied >>>>>>>>>>> over to >>>>>>>>>>> the source descriptor but decouples how a transform is specified >>>>>>>>>>> from the >>>>>>>>>>> object that describes what needs to be done. I believe Ismael's >>>>>>>>>>> point is >>>>>>>>>>> that we wouldn't need such a decoupling. >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe >>>>>>>>>>> is a non-issue is that the Beam Java SDK has the most IO connectors >>>>>>>>>>> and we >>>>>>>>>>> would want to use the IO implementations within Beam Go and Beam >>>>>>>>>>> Python. >>>>>>>>>>> This brings in its own set of issues related to versioning and >>>>>>>>>>> compatibility for the wire format and how one parameterizes such >>>>>>>>>>> transforms. The wire format issue can be solved with either >>>>>>>>>>> approach by >>>>>>>>>>> making sure that the cross language expansion always takes the well >>>>>>>>>>> known >>>>>>>>>>> format (whatever it may be) and converts it into >>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the >>>>>>>>>>> ReadAll >>>>>>>>>>> transform. Boyuan has been looking to make the >>>>>>>>>>> KafkaSourceDescriptor have a >>>>>>>>>>> schema so it can be represented as a row and this can be done >>>>>>>>>>> easily using >>>>>>>>>>> the AutoValue integration (I don't believe there is anything >>>>>>>>>>> preventing >>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or also >>>>>>>>>>> using the >>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue). >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> I would be more for the code duplication and separation >>>>>>>>>>> of concerns provided by using a different object to represent the >>>>>>>>>>> contents >>>>>>>>>>> of the PCollection from the pipeline construction time PTransform. >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> Hi Ismael, >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an >>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense >>>>>>>>>>> also have >>>>>>>>>>> to configure a dynamic number different IO transforms of the same >>>>>>>>>>> type >>>>>>>>>>> (file writes)? >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of >>>>>>>>>>> many file writes: >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>>>>>>>> Transaction>writeDynamic() >>>>>>>>>>> >>>>>>>>> .by(Transaction::getType) >>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert >>>>>>>>>>> the data to be written to CSVSink >>>>>>>>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>>>>>>>>> >>>>>>>>> .to(".../path/to/") >>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>>>>>>>> "-transactions", ".csv")); >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from >>>>>>>>>>> which all the read parameters can be inferred >>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>>>>>>>>>> Moo>readAll() >>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this >>>>>>>>>>> bar...) >>>>>>>>>>> >>>>>>>>> ...etc); >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires >>>>>>>>>>> context) >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn >>>>>>>>>>> based ones. One pattern >>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The >>>>>>>>>>> idea is to have a different >>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection >>>>>>>>>>> of different sorts of >>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, >>>>>>>>>>> etc, for example: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> JdbcIO: >>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, >>>>>>>>>>> PCollection<OutputT>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> RedisIO: >>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>>>>>> PCollection<KV<String, String>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>>>>>>>> PCollection<Result>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing >>>>>>>>>>> multiple queries in the same >>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying >>>>>>>>>>> from multiple tables at the >>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll >>>>>>>>>>> transforms the parameters for >>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>>>>>>>> duplicated with methods and >>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into the >>>>>>>>>>> ReadAll transforms. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand >>>>>>>>>>> the input parameters of the >>>>>>>>>>> >>>>>>>>>> intermediary specification into something that >>>>>>>>>>> resembles the full `Read` >>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to read >>>>>>>>>>> from multiple tables or >>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this was >>>>>>>>>>> not in the intermediate >>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra methods >>>>>>>>>>> (duplicating more code) >>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we >>>>>>>>>>> end up adding them >>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so they >>>>>>>>>>> are taken into account. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a >>>>>>>>>>> new approach that is >>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code >>>>>>>>>>> became: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>>>>>>>> PCollection<Result>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of >>>>>>>>>>> improvements on parameters of normal >>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read >>>>>>>>>>> parameters. But of course there are >>>>>>>>>>> >>>>>>>>>> some minor caveats: >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal >>>>>>>>>>> Reads for example >>>>>>>>>>> >>>>>>>>>> partition boundaries information or Restriction >>>>>>>>>>> information (in the SDF >>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent approach of >>>>>>>>>>> ReadAll produces a simple >>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable between >>>>>>>>>>> IOs (e.g. the non-SDF >>>>>>>>>>> >>>>>>>>>> case): >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends >>>>>>>>>>> PTransform<PCollection<Read>, >>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>>>>>>>> expand(PCollection<Read> input) { >>>>>>>>>>> >>>>>>>>>> return input >>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle", >>>>>>>>>>> Reshuffle.viaRandomKey()) >>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results >>>>>>>>>>> ReadAll you must have the >>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require >>>>>>>>>>> consistent types from the data >>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to add >>>>>>>>>>> extra withCoder method(s) on >>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this >>>>>>>>>>> ReadAll pattern. RedisIO >>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I >>>>>>>>>>> wanted to bring this subject >>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you >>>>>>>>>>> see any sort of issues that >>>>>>>>>>> >>>>>>>>>> we might be missing with this idea. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to >>>>>>>>>>> start using consistently the >>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and >>>>>>>>>>> the readAll() method for new >>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>>>>>>>>> remaining inconsistent >>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of >>>>>>>>>>> this we should be ok). >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based >>>>>>>>>>> on SDF is doing something >>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll >>>>>>>>>>> and maybe it is worth to be >>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>> Ismaël >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >>>>>>>>>>>
