On Thu, Jul 2, 2020 at 10:26 AM Kenneth Knowles <[email protected]> wrote:

>
> On Wed, Jul 1, 2020 at 4:17 PM Eugene Kirpichov <[email protected]>
> wrote:
>
>> Kenn - I don't mean an enum of common closures, I mean expressing
>> closures in a restricted sub-language such as the language of SQL
>> expressions.
>>
>
> My lack of clarity: enums was my phrasing of Luke's item 1). I understood
> what you meant. I think either a set of well-known closures or a tiny
> sublanguage could add value.
>
>
>> That would only work if there is a portable way to interpret SQL
>> expressions, but if there isn't, maybe there should be - for the sake of,
>> well, expressing closures portably. Of course these would be closures that
>> only work with rows - but that seems powerful enough for many if not most
>> purposes.
>>
>
> You can choose a SQL dialect or choose the tiniest subset just for this
> purpose and go with it. But when the data type going in or out of the
> lambda are e.g. some Java or Python object then what? One idea is to always
> require these to be rows. But if you can really get away with a
> dependency-free context-free lambda, then Javascript or Python is as doable
> as SQL in terms of having a tiny restricted language for just this purpose.
> I would expect once it got used, folks would start to ask to include the
> rest of what the language has to offer - its ecosystem. This is always the
> main design point I am interested in for "lightweight" embedded UDF
> proposals.
>

This is getting off the topic of ReadAll, but I think being able to do
arbitrary computation in preceding/succeeding transform plus a (quite)
restricted language in the transform itself can go a long way. (For
example, one could have a dynamic destinations write that takes a
KV<element, dest> where dest is a format string like
"foo-{{shard_num}}.txt" to plug in the truly dynamic pieces, but the dest
string itself can be computed (e.g. based on the element) using arbitrary
code in the caller language.)


>
> Kenn
>
>
>> For example, maybe the Java example:
>>
>>  PCollection<BankTransaction> transactions = ...;
>>  transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>      .by(Transaction::getType)
>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
>> written to CSVSink
>>           type -> new CSVSink(type.getFieldNames()))
>>      .to(".../path/to/")
>>      .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>>
>> could be written in Python as:
>>
>> transactions | fileio.write_dynamic(
>>   by="it.type",  # "it" is implicitly available in these SQL expressions
>> as the same thing as the Java lambda argument
>>   format="it.fields",
>>   sink="CSV_SINK(it.type.field_names)",  # A bunch of preset sinks
>> supported in every language?
>>   to=".../path/to/",
>>   naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')")
>>
>> Again, to be clear, I'm not suggesting to block what Ismael is proposing
>> on getting this done - getting this done wouldn't be a short term effort,
>> but seems potentially really nice.
>>
>>
>> On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <[email protected]> wrote:
>>
>>> From the Go side of the table, the Go language doesn't provide a
>>> mechanism to serialize or access closure data, which means DoFns can't be
>>> functional closures.This combined with the move to have the "Structural
>>> DoFns" be serialized using Beam Schemas, has the net result that if Go
>>> transforms are used for Cross Language, they will be configurable with a
>>> Schema of the configuration data.
>>>
>>> Of course, this just means that each language will probably provide
>>> whichever mechanisms it likes for use of it's cross language transforms.
>>>
>>> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <[email protected]> wrote:
>>>
>>>> I don't think an enum of most common closures will work. The input
>>>> types are typically generics that are made concrete by the caller who also
>>>> provides the closures. I think Luke's (2) is the same idea as my "Java
>>>> still assembles it [using opaque Python closures/transforms]". It seems
>>>> like an approach to (3). Passing over actual code could address some cases,
>>>> but libraries become the issue.
>>>>
>>>> I think it is fair to say that "WriteAll" style would involve entering
>>>> unexplored territory.
>>>>
>>>> On the main topic, I think Brian has a pretty strong point and his
>>>> example of type conversion lambdas is a good example. I did a quick survey
>>>> and every other property I could find does seem like it fits on the Read,
>>>> and most IOs have a few of these closures for example also extracting
>>>> timestamps. So maybe just a resolution convention of putting them on the
>>>> ReadAll and that taking precedence. Then you would be deserializing a Read
>>>> transform with insta-crash methods or some such?
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov <[email protected]>
>>>> wrote:
>>>>
>>>>> Yeah, mainly I just feel like dynamic reads and dynamic writes (and
>>>>> perhaps not-yet-invented similar transforms of other kinds) are tightly
>>>>> related - they are either very similar, or are duals of each other - so
>>>>> they should use the same approach. If they are using different approaches,
>>>>> it is a sign that either one of them is being done wrong or that we are
>>>>> running into a fundamental limitation of Beam (e.g. difficulty of encoding
>>>>> closures compared to encoding elements).
>>>>>
>>>>> But I agree with Luke that we shouldn't give up on closures.
>>>>> Especially with the work that has been done on schemas and SQL, I see no
>>>>> reason why we couldn't express closures in a portable restricted
>>>>> sub-language. If we can express SQL, we can express many or most use cases
>>>>> of dynamic reads/writes - I don't mean that we should actually use SQL
>>>>> (though we *could* - e.g. SQL scalar expressions seem powerful enough
>>>>> to express the closures appearing in most use cases of
>>>>> FileIO.writeDynamic), I just mean that SQL is an existence proof.
>>>>>
>>>>> (I don't want to rock the boat too much, just thought I'd chime in as
>>>>> this topic is dear to my heart)
>>>>>
>>>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> Kenn, I'm not too worried about closures since:
>>>>>> 1) the expansion service for a transform could have a well set of
>>>>>> defined closures by name that are returned as serialized objects that 
>>>>>> don't
>>>>>> need to be interpretable by the caller
>>>>>> 2) the language could store serialized functions of another language
>>>>>> as constants
>>>>>> 3) generic XLang function support will eventually be needed
>>>>>> but I do agree that closures do make things difficult to express vs
>>>>>> data which is why primarily why we should prefer data over closures when
>>>>>> possible and use closures when expressing it with data would be too
>>>>>> cumbersome.
>>>>>>
>>>>>> Brian, so far the cases that have been migrated have shown that the
>>>>>> source descriptor and the Read transform are almost the same (some
>>>>>> parameters that only impact pipeline construction such as coders differ).
>>>>>>
>>>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry for jumping into this late and casting a vote against the
>>>>>>> consensus... but I think I'd prefer standardizing on a pattern like
>>>>>>> PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. That
>>>>>>> approach clearly separates the parameters that are allowed to vary 
>>>>>>> across a
>>>>>>> ReadAll (the ones defined in KafkaSourceDescriptor) from the parameters
>>>>>>> that should be constant (other parameters in the Read object, like
>>>>>>> SerializedFunctions for type conversions, parameters for different
>>>>>>> operating modes, etc...). I think it's helpful to think of the 
>>>>>>> parameters
>>>>>>> that are allowed to vary as some "location descriptor", but I imagine IO
>>>>>>> authors may want other parameters to vary across a ReadAll as well.
>>>>>>>
>>>>>>> To me it seems safer to let an IO author "opt-in" to a parameter
>>>>>>> being dynamic at execution time.
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I'd like to raise one more time the question of consistency between
>>>>>>>> dynamic reads and dynamic writes, per my email at the beginning of the
>>>>>>>> thread.
>>>>>>>> If the community prefers ReadAll to read from Read, then should
>>>>>>>> dynamicWrite's write to Write?
>>>>>>>>
>>>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> It seems like most of us agree on the idea that ReadAll should
>>>>>>>>> read from Read. I'm going to update the Kafka ReadAll with the same
>>>>>>>>> pattern.
>>>>>>>>> Thanks for all your help!
>>>>>>>>>
>>>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I would also like to suggest that transforms that implement
>>>>>>>>>>> ReadAll via Read should also provide methods like:
>>>>>>>>>>>
>>>>>>>>>>> // Uses the specified values if unspecified in the input element
>>>>>>>>>>> from the PCollection<Read>.
>>>>>>>>>>> withDefaults(Read read);
>>>>>>>>>>> // Uses the specified values regardless of what the input
>>>>>>>>>>> element from the PCollection<Read> specifies.
>>>>>>>>>>> withOverrides(Read read);
>>>>>>>>>>>
>>>>>>>>>>> and only adds methods that are required at construction time
>>>>>>>>>>> (e.g. coders). This way the majority of documentation sits on the 
>>>>>>>>>>> Read
>>>>>>>>>>> transform.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here and
>>>>>>>>>> some of the drawbacks related to cross-language can be overcome 
>>>>>>>>>> through
>>>>>>>>>> future advancements.
>>>>>>>>>> Thanks for bringing this up Ismaël.
>>>>>>>>>>
>>>>>>>>>> - Cham
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ismael, it is good to hear that using Read as the input didn't
>>>>>>>>>>>> have a bunch of parameters that were being skipped/ignored. Also, 
>>>>>>>>>>>> for the
>>>>>>>>>>>> polymorphism issue you have to rely on the user correctly telling 
>>>>>>>>>>>> you the
>>>>>>>>>>>> type in such a way where it is a common ancestor of all the 
>>>>>>>>>>>> runtime types
>>>>>>>>>>>> that will ever be used. This usually boils down to something like
>>>>>>>>>>>> Serializable or DynamicMessage such that the coder that is chosen 
>>>>>>>>>>>> works for
>>>>>>>>>>>> all the runtime types. Using multiple types is a valid use case 
>>>>>>>>>>>> and would
>>>>>>>>>>>> allow for a simpler graph with less flattens merging the output 
>>>>>>>>>>>> from
>>>>>>>>>>>> multiple sources.
>>>>>>>>>>>>
>>>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for
>>>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the parameters 
>>>>>>>>>>>> can't be
>>>>>>>>>>>> represented in a meaningful way beyond "bytes". This would be 
>>>>>>>>>>>> helpful for
>>>>>>>>>>>> cross language as well since every parameter would become 
>>>>>>>>>>>> available if a
>>>>>>>>>>>> language could support it (e.g. it could serialize a java function 
>>>>>>>>>>>> up front
>>>>>>>>>>>> and keep it saved as raw bytes within said language). Even if we 
>>>>>>>>>>>> figure out
>>>>>>>>>>>> a better way to do this in the future, we'll have to change the 
>>>>>>>>>>>> schema for
>>>>>>>>>>>> the new way anyway. This would mean that the external version of 
>>>>>>>>>>>> the
>>>>>>>>>>>> transform adopts Row to Read and we drop KafkaSourceDescriptor. The
>>>>>>>>>>>> conversion from Row to Read could validate that the parameters 
>>>>>>>>>>>> make sense
>>>>>>>>>>>> (e.g. the bytes are valid serialized functions). The addition of an
>>>>>>>>>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as 
>>>>>>>>>>>> well and
>>>>>>>>>>>> this would enable having a bounded version that could be used for 
>>>>>>>>>>>> backfills
>>>>>>>>>>>> (this doesn't have to be done as part of any current ongoing PR).
>>>>>>>>>>>> Essentially any parameter that could be added for a single 
>>>>>>>>>>>> instance of a
>>>>>>>>>>>> Kafka element+restriction would also make sense to the KafkaIO.Read
>>>>>>>>>>>> transform since it too is a single instance. There are parameters 
>>>>>>>>>>>> that
>>>>>>>>>>>> would apply to the ReadAll that wouldn't apply to a read and these 
>>>>>>>>>>>> would be
>>>>>>>>>>>> global parameters across all element+restriction pairs such as 
>>>>>>>>>>>> config
>>>>>>>>>>>> overrides or default values.
>>>>>>>>>>>>
>>>>>>>>>>>> I am convinced that we should do as Ismael is suggesting and
>>>>>>>>>>>> use KafkaIO.Read as the type.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Discussion regarding cross-language transforms is a slight
>>>>>>>>>>>>> tangent here. But I think, in general, it's great if we can use 
>>>>>>>>>>>>> existing
>>>>>>>>>>>>> transforms (for example, IO connectors) as cross-language 
>>>>>>>>>>>>> transforms
>>>>>>>>>>>>> without having to build more composites (irrespective of whether 
>>>>>>>>>>>>> in
>>>>>>>>>>>>> ExternalTransformBuilders or a user pipelines) just to make them
>>>>>>>>>>>>> cross-language compatible. A future cross-language compatible 
>>>>>>>>>>>>> SchemaCoder
>>>>>>>>>>>>> might help (assuming that works for Read transform) but I'm not 
>>>>>>>>>>>>> sure we
>>>>>>>>>>>>> have a good idea when we'll get to that state.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the
>>>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline update 
>>>>>>>>>>>>>> scenario(For
>>>>>>>>>>>>>> detailed discussion, please refer to
>>>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>>>>>>>>>>>>> In order to obtain the compatibility, it requires the input of 
>>>>>>>>>>>>>> the read SDF
>>>>>>>>>>>>>> is schema-aware.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to
>>>>>>>>>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be 
>>>>>>>>>>>>>> schema-aware,
>>>>>>>>>>>>>> otherwise pipeline updates might fail unnecessarily. If looking 
>>>>>>>>>>>>>> into
>>>>>>>>>>>>>> KafkaIO.Read, not all necessary fields are compatible with 
>>>>>>>>>>>>>> schema, for
>>>>>>>>>>>>>> example, SerializedFunction.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a
>>>>>>>>>>>>>> common pattern for SDF based IO. The Read can be a common 
>>>>>>>>>>>>>> pattern because
>>>>>>>>>>>>>> the input is always a PBegin. But for an SDF based IO, the input 
>>>>>>>>>>>>>> can be
>>>>>>>>>>>>>> anything. By using Read as input, we will still have the 
>>>>>>>>>>>>>> maintenance cost
>>>>>>>>>>>>>> when SDF IO supports a new field but Read doesn't consume it. 
>>>>>>>>>>>>>> For example,
>>>>>>>>>>>>>> we are discussing adding endOffset and endReadTime to
>>>>>>>>>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We forgot to mention (5) External.Config used in cross-lang,
>>>>>>>>>>>>>>> see KafkaIO
>>>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the predecessor
>>>>>>>>>>>>>>> of (4) and probably a
>>>>>>>>>>>>>>> really good candidate to be replaced by the Row based
>>>>>>>>>>>>>>> Configuration Boyuan is
>>>>>>>>>>>>>>> envisioning (so good to be aware of this).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention the real
>>>>>>>>>>>>>>> issue(s). All the
>>>>>>>>>>>>>>> approaches discussed so far in the end could be easily
>>>>>>>>>>>>>>> transformed to produce a
>>>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be read by
>>>>>>>>>>>>>>> the generic ReadAll
>>>>>>>>>>>>>>> transform. Notice that this can be internal in some IOs e.g.
>>>>>>>>>>>>>>> KafkaIO if they
>>>>>>>>>>>>>>> decide not to expose it. I am not saying that we should
>>>>>>>>>>>>>>> force every IO to
>>>>>>>>>>>>>>> support ReadAll in its public API but if we do it is
>>>>>>>>>>>>>>> probably a good idea to be
>>>>>>>>>>>>>>> consistent with naming the transform that expects an input
>>>>>>>>>>>>>>> PCollection<Read> in
>>>>>>>>>>>>>>> the same way. Also notice that using it will save us of the
>>>>>>>>>>>>>>> maintenance issues
>>>>>>>>>>>>>>> discussed in my previous email.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Back to the main concern: the consequences of expansion
>>>>>>>>>>>>>>> based on Read: So far I
>>>>>>>>>>>>>>> have not seen consequences for the Splitting part which maps
>>>>>>>>>>>>>>> really nice
>>>>>>>>>>>>>>> assuming the Partition info / Restriction is available as
>>>>>>>>>>>>>>> part of Read. So far
>>>>>>>>>>>>>>> there are not Serialization because Beam is already
>>>>>>>>>>>>>>> enforcing this. Notice that
>>>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF
>>>>>>>>>>>>>>> at least for the
>>>>>>>>>>>>>>> Bounded case (see the code in my previous email). For the
>>>>>>>>>>>>>>> other points:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll?
>>>>>>>>>>>>>>> For example, the
>>>>>>>>>>>>>>> > Kafka Read implementation allows you to set the key and
>>>>>>>>>>>>>>> value deserializers
>>>>>>>>>>>>>>> > which are also used to dictate the output PCollection
>>>>>>>>>>>>>>> type. It also allows you
>>>>>>>>>>>>>>> > to set how the watermark should be computed. Technically a
>>>>>>>>>>>>>>> user may want the
>>>>>>>>>>>>>>> > watermark computation to be configurable per Read and they
>>>>>>>>>>>>>>> may also want an
>>>>>>>>>>>>>>> > output type which is polymorphic (e.g.
>>>>>>>>>>>>>>> Pcollection<Serializable>).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Most of the times they do but for parametric types we cannot
>>>>>>>>>>>>>>> support different
>>>>>>>>>>>>>>> types in the outputs of the Read or at least I did not find
>>>>>>>>>>>>>>> how to do so (is
>>>>>>>>>>>>>>> there a way to use multiple output Coders on Beam?), we saw
>>>>>>>>>>>>>>> this in CassandraIO
>>>>>>>>>>>>>>> and we were discussing adding explicitly these Coders or
>>>>>>>>>>>>>>> Serializer
>>>>>>>>>>>>>>> specific methods to the ReadAll transform. This is less nice
>>>>>>>>>>>>>>> because it will
>>>>>>>>>>>>>>> imply some repeated methods, but it is still a compromise to
>>>>>>>>>>>>>>> gain the other
>>>>>>>>>>>>>>> advantages. I suppose the watermark case you mention is
>>>>>>>>>>>>>>> similar because you may
>>>>>>>>>>>>>>> want the watermark to behave differently in each Read and we
>>>>>>>>>>>>>>> probably don’t
>>>>>>>>>>>>>>> support this, so it corresponds to the polymorphic category.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>>>>>> modelling concerns.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>),
>>>>>>>>>>>>>>> was it discovered
>>>>>>>>>>>>>>> > that some properties became runtime errors or were ignored
>>>>>>>>>>>>>>> if they were set?
>>>>>>>>>>>>>>> > If no, then the code deduplication is likely worth it
>>>>>>>>>>>>>>> because we also get a
>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an
>>>>>>>>>>>>>>> acceptable user
>>>>>>>>>>>>>>> > experience?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> No, not so far. This is an interesting part, notice that the
>>>>>>>>>>>>>>> Read translation
>>>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part of
>>>>>>>>>>>>>>> ReadAll so the ReadFn is
>>>>>>>>>>>>>>> the real read and must be aware and use all the parameters.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>     public PCollection<SolrDocument> expand(PBegin input) {
>>>>>>>>>>>>>>>       return input.apply("Create",
>>>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll());
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I might be missing something for the Unbounded SDF case
>>>>>>>>>>>>>>> which is the only case
>>>>>>>>>>>>>>> we have not explored so far. I think one easy way to see the
>>>>>>>>>>>>>>> limitations would
>>>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to
>>>>>>>>>>>>>>> map
>>>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and
>>>>>>>>>>>>>>> the Read logic on
>>>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit,
>>>>>>>>>>>>>>> the polymorphic ones
>>>>>>>>>>>>>>> will be there for sure, maybe others will appear (not sure).
>>>>>>>>>>>>>>> However it would be
>>>>>>>>>>>>>>> interesting to see if we have a real gain in the maintenance
>>>>>>>>>>>>>>> points, but well
>>>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so
>>>>>>>>>>>>>>> probably the generic
>>>>>>>>>>>>>>> implementation could be relatively complex.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for
>>>>>>>>>>>>>>> cross language. The difference being that the cross language 
>>>>>>>>>>>>>>> transform
>>>>>>>>>>>>>>> would take a well known definition and convert it to the Read 
>>>>>>>>>>>>>>> transform. A
>>>>>>>>>>>>>>> normal user would have a pipeline that would look like:
>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>>>> PTransform(Convert Row to Read) -> PCollection<Read> -> 
>>>>>>>>>>>>>>> PTransform(ReadAll)
>>>>>>>>>>>>>>> -> PCollection<Output>
>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>>>> PTransform(Convert Row to SourceDescriptor) ->
>>>>>>>>>>>>>>> PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>> PCollection<Output>*
>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor)
>>>>>>>>>>>>>>> only exists since we haven't solved how to use schemas with 
>>>>>>>>>>>>>>> language bound
>>>>>>>>>>>>>>> types in a cross language way. SchemaCoder isn't portable but 
>>>>>>>>>>>>>>> RowCoder is
>>>>>>>>>>>>>>> which is why the conversion step exists. We could have a 
>>>>>>>>>>>>>>> solution for this
>>>>>>>>>>>>>>> at some point in time.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll?
>>>>>>>>>>>>>>> For example, the Kafka Read implementation allows you to set 
>>>>>>>>>>>>>>> the key and
>>>>>>>>>>>>>>> value deserializers which are also used to dictate the output 
>>>>>>>>>>>>>>> PCollection
>>>>>>>>>>>>>>> type. It also allows you to set how the watermark should be 
>>>>>>>>>>>>>>> computed.
>>>>>>>>>>>>>>> Technically a user may want the watermark computation to be 
>>>>>>>>>>>>>>> configurable
>>>>>>>>>>>>>>> per Read and they may also want an output type which is 
>>>>>>>>>>>>>>> polymorphic (e.g.
>>>>>>>>>>>>>>> PCollection<Serializable>).
>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>>>>>> modelling concerns.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>),
>>>>>>>>>>>>>>> was it discovered that some properties became runtime errors or 
>>>>>>>>>>>>>>> were
>>>>>>>>>>>>>>> ignored if they were set? If no, then the code deduplication is 
>>>>>>>>>>>>>>> likely
>>>>>>>>>>>>>>> worth it because we also get a lot of javadoc deduplication, 
>>>>>>>>>>>>>>> but if yes is
>>>>>>>>>>>>>>> this an acceptable user experience?
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a
>>>>>>>>>>>>>>> general "PTransform<PCollection<Read>, 
>>>>>>>>>>>>>>> PCollection<OutputType>>” was to
>>>>>>>>>>>>>>> reduce the amount of code duplication and error-prone approach 
>>>>>>>>>>>>>>> related to
>>>>>>>>>>>>>>> this. It makes much sense since usually we have all needed 
>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will 
>>>>>>>>>>>>>>> consist mostly
>>>>>>>>>>>>>>> of only Split-Shuffle-Read stages.  So this case usually can be 
>>>>>>>>>>>>>>> unified by
>>>>>>>>>>>>>>> using PCollection<Read> as input.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs
>>>>>>>>>>>>>>> as cross-language transforms (as Luke described) which seems 
>>>>>>>>>>>>>>> only partly in
>>>>>>>>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be 
>>>>>>>>>>>>>>> more in
>>>>>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe 
>>>>>>>>>>>>>>> ReadAll is
>>>>>>>>>>>>>>> not a very suitable name in this case because it will can bring 
>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and
>>>>>>>>>>>>>>> (4): use the data type that is schema-aware as the input of 
>>>>>>>>>>>>>>> ReadAll.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type
>>>>>>>>>>>>>>> that is schema-aware as the input of ReadAll.
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with
>>>>>>>>>>>>>>> SDF-like IO. But only having  (3) is not enough to solve the 
>>>>>>>>>>>>>>> problem of
>>>>>>>>>>>>>>> using ReadAll in x-lang case.
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of
>>>>>>>>>>>>>>> ReadAll should be able to cross language boundaries and have
>>>>>>>>>>>>>>> compatibilities of updating/downgrading. After investigating 
>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> possibilities(pure java pojo with custom coder, protobuf, 
>>>>>>>>>>>>>>> row/schema) in
>>>>>>>>>>>>>>> Kafka usage, we find that row/schema fits our needs most. Here 
>>>>>>>>>>>>>>> comes (4). I
>>>>>>>>>>>>>>> believe that using Read as input of ReadAll makes sense in some 
>>>>>>>>>>>>>>> cases, but
>>>>>>>>>>>>>>> I also think not all IOs have the same need. I would treat Read 
>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>> special type as long as the Read is schema-aware.
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options
>>>>>>>>>>>>>>> discussed so far when it comes to defining source descriptors 
>>>>>>>>>>>>>>> for ReadAll
>>>>>>>>>>>>>>> type transforms
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the
>>>>>>>>>>>>>>> input PCollection
>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data
>>>>>>>>>>>>>>> element of the input PCollection
>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read
>>>>>>>>>>>>>>> transform which essentially will convert it to a ReadAll (what 
>>>>>>>>>>>>>>> Eugene
>>>>>>>>>>>>>>> mentioned)
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of
>>>>>>>>>>>>>>> source descriptions such as files.
>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make
>>>>>>>>>>>>>>> it hard to use the ReadAll transform as a cross-language 
>>>>>>>>>>>>>>> transform and will
>>>>>>>>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but
>>>>>>>>>>>>>>> will make the transform easier to be used as a cross-language 
>>>>>>>>>>>>>>> transform
>>>>>>>>>>>>>>> without additional modifications
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms
>>>>>>>>>>>>>>> that are more efficient. So we might be able to just define all 
>>>>>>>>>>>>>>> sources in
>>>>>>>>>>>>>>> that format and make Read transforms just an easy to use 
>>>>>>>>>>>>>>> composite built on
>>>>>>>>>>>>>>> top of that (by adding a preceding Create transform).
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>>>>>> >>>> Cham
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable
>>>>>>>>>>>>>>> since anonymous DoFns typically capture the enclosing 
>>>>>>>>>>>>>>> PTransform.
>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a
>>>>>>>>>>>>>>> transform, at least here:
>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time
>>>>>>>>>>>>>>> transforms from execution time data objects that we store in 
>>>>>>>>>>>>>>> PCollections
>>>>>>>>>>>>>>> as Luke mentioned. Also, we don't guarantee that PTransform is 
>>>>>>>>>>>>>>> serializable
>>>>>>>>>>>>>>> so users have the additional complexity of providing a corder 
>>>>>>>>>>>>>>> whenever a
>>>>>>>>>>>>>>> PTransform is used as a data object.
>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java
>>>>>>>>>>>>>>> objects that are convertible to Beam Rows allow us to make 
>>>>>>>>>>>>>>> these transforms
>>>>>>>>>>>>>>> available to other SDKs through the cross-language transforms. 
>>>>>>>>>>>>>>> Using
>>>>>>>>>>>>>>> transforms or complex sources as data objects will probably 
>>>>>>>>>>>>>>> make this
>>>>>>>>>>>>>>> difficult.
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to
>>>>>>>>>>>>>>> the IO with SDF implementation despite the type of input, where 
>>>>>>>>>>>>>>> Read refers
>>>>>>>>>>>>>>> to UnboundedSource.  One major pushback of using KafkaIO.Read 
>>>>>>>>>>>>>>> as source
>>>>>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>>>>>>>>>> meaningful
>>>>>>>>>>>>>>> to populate during execution time. Also when thinking about 
>>>>>>>>>>>>>>> x-lang useage,
>>>>>>>>>>>>>>> making source description across language boundaries is also 
>>>>>>>>>>>>>>> necessary.  As
>>>>>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an 
>>>>>>>>>>>>>>> AutoValue object:
>>>>>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this 
>>>>>>>>>>>>>>> schema-aware object
>>>>>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's 
>>>>>>>>>>>>>>> also easy to
>>>>>>>>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable 
>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>> for reading from Kafka. This is different from the pattern that 
>>>>>>>>>>>>>>> Ismael
>>>>>>>>>>>>>>> listed because they take PCollection<Read> as input and the 
>>>>>>>>>>>>>>> Read is the
>>>>>>>>>>>>>>> same as the Read PTransform class used for the non read all 
>>>>>>>>>>>>>>> case.
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication
>>>>>>>>>>>>>>> since parameters used to configure the transform have to be 
>>>>>>>>>>>>>>> copied over to
>>>>>>>>>>>>>>> the source descriptor but decouples how a transform is 
>>>>>>>>>>>>>>> specified from the
>>>>>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's 
>>>>>>>>>>>>>>> point is
>>>>>>>>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I
>>>>>>>>>>>>>>> believe is a non-issue is that the Beam Java SDK has the most 
>>>>>>>>>>>>>>> IO connectors
>>>>>>>>>>>>>>> and we would want to use the IO implementations within Beam Go 
>>>>>>>>>>>>>>> and Beam
>>>>>>>>>>>>>>> Python. This brings in its own set of issues related to 
>>>>>>>>>>>>>>> versioning and
>>>>>>>>>>>>>>> compatibility for the wire format and how one parameterizes such
>>>>>>>>>>>>>>> transforms. The wire format issue can be solved with either 
>>>>>>>>>>>>>>> approach by
>>>>>>>>>>>>>>> making sure that the cross language expansion always takes the 
>>>>>>>>>>>>>>> well known
>>>>>>>>>>>>>>> format (whatever it may be) and converts it into
>>>>>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to 
>>>>>>>>>>>>>>> the ReadAll
>>>>>>>>>>>>>>> transform. Boyuan has been looking to make the 
>>>>>>>>>>>>>>> KafkaSourceDescriptor have a
>>>>>>>>>>>>>>> schema so it can be represented as a row and this can be done 
>>>>>>>>>>>>>>> easily using
>>>>>>>>>>>>>>> the AutoValue integration (I don't believe there is anything 
>>>>>>>>>>>>>>> preventing
>>>>>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or 
>>>>>>>>>>>>>>> also using the
>>>>>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue).
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and
>>>>>>>>>>>>>>> separation of concerns provided by using a different object to 
>>>>>>>>>>>>>>> represent
>>>>>>>>>>>>>>> the contents of the PCollection from the pipeline construction 
>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>> PTransform.
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a 
>>>>>>>>>>>>>>> sense also have
>>>>>>>>>>>>>>> to configure a dynamic number different IO transforms of the 
>>>>>>>>>>>>>>> same type
>>>>>>>>>>>>>>> (file writes)?
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects
>>>>>>>>>>>>>>> of many file writes:
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  //
>>>>>>>>>>>>>>> Convert the data to be written to CSVSink
>>>>>>>>>>>>>>> >>>>>>>>>           type -> new
>>>>>>>>>>>>>>> CSVSink(type.getFieldNames()))
>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>>>>>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO
>>>>>>>>>>>>>>> reads:
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from
>>>>>>>>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>>>>>>>>> Moo>readAll()
>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this
>>>>>>>>>>>>>>> bar...)
>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for
>>>>>>>>>>>>>>> this bar...)
>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires
>>>>>>>>>>>>>>> context)
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn
>>>>>>>>>>>>>>> based ones. One pattern
>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The
>>>>>>>>>>>>>>> idea is to have a different
>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a
>>>>>>>>>>>>>>> PCollection of different sorts of
>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries,
>>>>>>>>>>>>>>> etc, for example:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>,
>>>>>>>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends
>>>>>>>>>>>>>>> PTransform<PCollection<HBaseQuery>, PCollection<Result>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like
>>>>>>>>>>>>>>> doing multiple queries in the same
>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or
>>>>>>>>>>>>>>> querying from multiple tables at the
>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll
>>>>>>>>>>>>>>> transforms the parameters for
>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>>>>>>>>>>>> duplicated with methods and
>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into
>>>>>>>>>>>>>>> the ReadAll transforms.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to
>>>>>>>>>>>>>>> expand the input parameters of the
>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something that
>>>>>>>>>>>>>>> resembles the full `Read`
>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read
>>>>>>>>>>>>>>> from multiple tables or
>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this
>>>>>>>>>>>>>>> was not in the intermediate
>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra
>>>>>>>>>>>>>>> methods (duplicating more code)
>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full
>>>>>>>>>>>>>>> spec.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method
>>>>>>>>>>>>>>> we end up adding them
>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so
>>>>>>>>>>>>>>> they are taken into account.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to
>>>>>>>>>>>>>>> test a new approach that is
>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code
>>>>>>>>>>>>>>> became:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of
>>>>>>>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read
>>>>>>>>>>>>>>> parameters. But of course there are
>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal
>>>>>>>>>>>>>>> Reads for example
>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or
>>>>>>>>>>>>>>> Restriction information (in the SDF
>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach
>>>>>>>>>>>>>>> of ReadAll produces a simple
>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable
>>>>>>>>>>>>>>> between IOs (e.g. the    non-SDF
>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>>>>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>>>>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle",
>>>>>>>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>>>>>> >>>>>>>>>>     }
>>>>>>>>>>>>>>> >>>>>>>>>>   }
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>>>>>>>>> ReadAll you must have the
>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require
>>>>>>>>>>>>>>> consistent types from the data
>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add
>>>>>>>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow
>>>>>>>>>>>>>>> this ReadAll pattern. RedisIO
>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So
>>>>>>>>>>>>>>> I wanted to bring this subject
>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if
>>>>>>>>>>>>>>> you see any sort of issues that
>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to
>>>>>>>>>>>>>>> start using consistently the
>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read
>>>>>>>>>>>>>>> and the readAll() method for new
>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the
>>>>>>>>>>>>>>> only remaining inconsistent
>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but
>>>>>>>>>>>>>>> apart of this we should be ok).
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO
>>>>>>>>>>>>>>> based on SDF is doing something
>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called
>>>>>>>>>>>>>>> ReadAll and maybe it is worth to be
>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for
>>>>>>>>>>>>>>> cross language. The difference being that the cross language 
>>>>>>>>>>>>>>> transform
>>>>>>>>>>>>>>> would take a well known definition and convert it to the Read 
>>>>>>>>>>>>>>> transform. A
>>>>>>>>>>>>>>> normal user would have a pipeline that would look like:
>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>>>> PTransform(Convert Row to Read) -> PCollection<Read> -> 
>>>>>>>>>>>>>>> PTransform(ReadAll)
>>>>>>>>>>>>>>> -> PCollection<Output>
>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>>>> PTransform(Convert Row to SourceDescriptor) ->
>>>>>>>>>>>>>>> PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>> PCollection<Output>*
>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor)
>>>>>>>>>>>>>>> only exists since we haven't solved how to use schemas with 
>>>>>>>>>>>>>>> language bound
>>>>>>>>>>>>>>> types in a cross language way. SchemaCoder isn't portable but 
>>>>>>>>>>>>>>> RowCoder is
>>>>>>>>>>>>>>> which is why the conversion step exists. We could have a 
>>>>>>>>>>>>>>> solution for this
>>>>>>>>>>>>>>> at some point in time.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll?
>>>>>>>>>>>>>>> For example, the Kafka Read implementation allows you to set 
>>>>>>>>>>>>>>> the key and
>>>>>>>>>>>>>>> value deserializers which are also used to dictate the output 
>>>>>>>>>>>>>>> PCollection
>>>>>>>>>>>>>>> type. It also allows you to set how the watermark should be 
>>>>>>>>>>>>>>> computed.
>>>>>>>>>>>>>>> Technically a user may want the watermark computation to be 
>>>>>>>>>>>>>>> configurable
>>>>>>>>>>>>>>> per Read and they may also want an output type which is 
>>>>>>>>>>>>>>> polymorphic (e.g.
>>>>>>>>>>>>>>> PCollection<Serializable>).
>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>>>>>> modelling concerns.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>),
>>>>>>>>>>>>>>> was it discovered that some properties became runtime errors or 
>>>>>>>>>>>>>>> were
>>>>>>>>>>>>>>> ignored if they were set? If no, then the code deduplication is 
>>>>>>>>>>>>>>> likely
>>>>>>>>>>>>>>> worth it because we also get a lot of javadoc deduplication, 
>>>>>>>>>>>>>>> but if yes is
>>>>>>>>>>>>>>> this an acceptable user experience?
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a
>>>>>>>>>>>>>>> general "PTransform<PCollection<Read>, 
>>>>>>>>>>>>>>> PCollection<OutputType>>” was to
>>>>>>>>>>>>>>> reduce the amount of code duplication and error-prone approach 
>>>>>>>>>>>>>>> related to
>>>>>>>>>>>>>>> this. It makes much sense since usually we have all needed 
>>>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will 
>>>>>>>>>>>>>>> consist mostly
>>>>>>>>>>>>>>> of only Split-Shuffle-Read stages.  So this case usually can be 
>>>>>>>>>>>>>>> unified by
>>>>>>>>>>>>>>> using PCollection<Read> as input.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs
>>>>>>>>>>>>>>> as cross-language transforms (as Luke described) which seems 
>>>>>>>>>>>>>>> only partly in
>>>>>>>>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be 
>>>>>>>>>>>>>>> more in
>>>>>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe 
>>>>>>>>>>>>>>> ReadAll is
>>>>>>>>>>>>>>> not a very suitable name in this case because it will can bring 
>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and
>>>>>>>>>>>>>>> (4): use the data type that is schema-aware as the input of 
>>>>>>>>>>>>>>> ReadAll.
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type
>>>>>>>>>>>>>>> that is schema-aware as the input of ReadAll.
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with
>>>>>>>>>>>>>>> SDF-like IO. But only having  (3) is not enough to solve the 
>>>>>>>>>>>>>>> problem of
>>>>>>>>>>>>>>> using ReadAll in x-lang case.
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of
>>>>>>>>>>>>>>> ReadAll should be able to cross language boundaries and have
>>>>>>>>>>>>>>> compatibilities of updating/downgrading. After investigating 
>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> possibilities(pure java pojo with custom coder, protobuf, 
>>>>>>>>>>>>>>> row/schema) in
>>>>>>>>>>>>>>> Kafka usage, we find that row/schema fits our needs most. Here 
>>>>>>>>>>>>>>> comes (4). I
>>>>>>>>>>>>>>> believe that using Read as input of ReadAll makes sense in some 
>>>>>>>>>>>>>>> cases, but
>>>>>>>>>>>>>>> I also think not all IOs have the same need. I would treat Read 
>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>> special type as long as the Read is schema-aware.
>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options
>>>>>>>>>>>>>>> discussed so far when it comes to defining source descriptors 
>>>>>>>>>>>>>>> for ReadAll
>>>>>>>>>>>>>>> type transforms
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the
>>>>>>>>>>>>>>> input PCollection
>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data
>>>>>>>>>>>>>>> element of the input PCollection
>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read
>>>>>>>>>>>>>>> transform which essentially will convert it to a ReadAll (what 
>>>>>>>>>>>>>>> Eugene
>>>>>>>>>>>>>>> mentioned)
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of
>>>>>>>>>>>>>>> source descriptions such as files.
>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make
>>>>>>>>>>>>>>> it hard to use the ReadAll transform as a cross-language 
>>>>>>>>>>>>>>> transform and will
>>>>>>>>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but
>>>>>>>>>>>>>>> will make the transform easier to be used as a cross-language 
>>>>>>>>>>>>>>> transform
>>>>>>>>>>>>>>> without additional modifications
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms
>>>>>>>>>>>>>>> that are more efficient. So we might be able to just define all 
>>>>>>>>>>>>>>> sources in
>>>>>>>>>>>>>>> that format and make Read transforms just an easy to use 
>>>>>>>>>>>>>>> composite built on
>>>>>>>>>>>>>>> top of that (by adding a preceding Create transform).
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>>>>>> >>>> Cham
>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable
>>>>>>>>>>>>>>> since anonymous DoFns typically capture the enclosing 
>>>>>>>>>>>>>>> PTransform.
>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a
>>>>>>>>>>>>>>> transform, at least here:
>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time
>>>>>>>>>>>>>>> transforms from execution time data objects that we store in 
>>>>>>>>>>>>>>> PCollections
>>>>>>>>>>>>>>> as Luke mentioned. Also, we don't guarantee that PTransform is 
>>>>>>>>>>>>>>> serializable
>>>>>>>>>>>>>>> so users have the additional complexity of providing a corder 
>>>>>>>>>>>>>>> whenever a
>>>>>>>>>>>>>>> PTransform is used as a data object.
>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java
>>>>>>>>>>>>>>> objects that are convertible to Beam Rows allow us to make 
>>>>>>>>>>>>>>> these transforms
>>>>>>>>>>>>>>> available to other SDKs through the cross-language transforms. 
>>>>>>>>>>>>>>> Using
>>>>>>>>>>>>>>> transforms or complex sources as data objects will probably 
>>>>>>>>>>>>>>> make this
>>>>>>>>>>>>>>> difficult.
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to
>>>>>>>>>>>>>>> the IO with SDF implementation despite the type of input, where 
>>>>>>>>>>>>>>> Read refers
>>>>>>>>>>>>>>> to UnboundedSource.  One major pushback of using KafkaIO.Read 
>>>>>>>>>>>>>>> as source
>>>>>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>>>>>>>>>> meaningful
>>>>>>>>>>>>>>> to populate during execution time. Also when thinking about 
>>>>>>>>>>>>>>> x-lang useage,
>>>>>>>>>>>>>>> making source description across language boundaries is also 
>>>>>>>>>>>>>>> necessary.  As
>>>>>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an 
>>>>>>>>>>>>>>> AutoValue object:
>>>>>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this 
>>>>>>>>>>>>>>> schema-aware object
>>>>>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's 
>>>>>>>>>>>>>>> also easy to
>>>>>>>>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable 
>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>> for reading from Kafka. This is different from the pattern that 
>>>>>>>>>>>>>>> Ismael
>>>>>>>>>>>>>>> listed because they take PCollection<Read> as input and the 
>>>>>>>>>>>>>>> Read is the
>>>>>>>>>>>>>>> same as the Read PTransform class used for the non read all 
>>>>>>>>>>>>>>> case.
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication
>>>>>>>>>>>>>>> since parameters used to configure the transform have to be 
>>>>>>>>>>>>>>> copied over to
>>>>>>>>>>>>>>> the source descriptor but decouples how a transform is 
>>>>>>>>>>>>>>> specified from the
>>>>>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's 
>>>>>>>>>>>>>>> point is
>>>>>>>>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I
>>>>>>>>>>>>>>> believe is a non-issue is that the Beam Java SDK has the most 
>>>>>>>>>>>>>>> IO connectors
>>>>>>>>>>>>>>> and we would want to use the IO implementations within Beam Go 
>>>>>>>>>>>>>>> and Beam
>>>>>>>>>>>>>>> Python. This brings in its own set of issues related to 
>>>>>>>>>>>>>>> versioning and
>>>>>>>>>>>>>>> compatibility for the wire format and how one parameterizes such
>>>>>>>>>>>>>>> transforms. The wire format issue can be solved with either 
>>>>>>>>>>>>>>> approach by
>>>>>>>>>>>>>>> making sure that the cross language expansion always takes the 
>>>>>>>>>>>>>>> well known
>>>>>>>>>>>>>>> format (whatever it may be) and converts it into
>>>>>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to 
>>>>>>>>>>>>>>> the ReadAll
>>>>>>>>>>>>>>> transform. Boyuan has been looking to make the 
>>>>>>>>>>>>>>> KafkaSourceDescriptor have a
>>>>>>>>>>>>>>> schema so it can be represented as a row and this can be done 
>>>>>>>>>>>>>>> easily using
>>>>>>>>>>>>>>> the AutoValue integration (I don't believe there is anything 
>>>>>>>>>>>>>>> preventing
>>>>>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or 
>>>>>>>>>>>>>>> also using the
>>>>>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue).
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and
>>>>>>>>>>>>>>> separation of concerns provided by using a different object to 
>>>>>>>>>>>>>>> represent
>>>>>>>>>>>>>>> the contents of the PCollection from the pipeline construction 
>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>> PTransform.
>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a 
>>>>>>>>>>>>>>> sense also have
>>>>>>>>>>>>>>> to configure a dynamic number different IO transforms of the 
>>>>>>>>>>>>>>> same type
>>>>>>>>>>>>>>> (file writes)?
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects
>>>>>>>>>>>>>>> of many file writes:
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  //
>>>>>>>>>>>>>>> Convert the data to be written to CSVSink
>>>>>>>>>>>>>>> >>>>>>>>>           type -> new
>>>>>>>>>>>>>>> CSVSink(type.getFieldNames()))
>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>>>>>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO
>>>>>>>>>>>>>>> reads:
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from
>>>>>>>>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>>>>>>>>> Moo>readAll()
>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this
>>>>>>>>>>>>>>> bar...)
>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for
>>>>>>>>>>>>>>> this bar...)
>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires
>>>>>>>>>>>>>>> context)
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn
>>>>>>>>>>>>>>> based ones. One pattern
>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The
>>>>>>>>>>>>>>> idea is to have a different
>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a
>>>>>>>>>>>>>>> PCollection of different sorts of
>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries,
>>>>>>>>>>>>>>> etc, for example:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>,
>>>>>>>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends
>>>>>>>>>>>>>>> PTransform<PCollection<HBaseQuery>, PCollection<Result>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like
>>>>>>>>>>>>>>> doing multiple queries in the same
>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or
>>>>>>>>>>>>>>> querying from multiple tables at the
>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll
>>>>>>>>>>>>>>> transforms the parameters for
>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>>>>>>>>>>>> duplicated with methods and
>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into
>>>>>>>>>>>>>>> the ReadAll transforms.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to
>>>>>>>>>>>>>>> expand the input parameters of the
>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something that
>>>>>>>>>>>>>>> resembles the full `Read`
>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read
>>>>>>>>>>>>>>> from multiple tables or
>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this
>>>>>>>>>>>>>>> was not in the intermediate
>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra
>>>>>>>>>>>>>>> methods (duplicating more code)
>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full
>>>>>>>>>>>>>>> spec.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method
>>>>>>>>>>>>>>> we end up adding them
>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so
>>>>>>>>>>>>>>> they are taken into account.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to
>>>>>>>>>>>>>>> test a new approach that is
>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code
>>>>>>>>>>>>>>> became:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of
>>>>>>>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read
>>>>>>>>>>>>>>> parameters. But of course there are
>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal
>>>>>>>>>>>>>>> Reads for example
>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or
>>>>>>>>>>>>>>> Restriction information (in the SDF
>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach
>>>>>>>>>>>>>>> of ReadAll produces a simple
>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable
>>>>>>>>>>>>>>> between IOs (e.g. the    non-SDF
>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>>>>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>>>>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle",
>>>>>>>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>>>>>> >>>>>>>>>>     }
>>>>>>>>>>>>>>> >>>>>>>>>>   }
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>>>>>>>>> ReadAll you must have the
>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require
>>>>>>>>>>>>>>> consistent types from the data
>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add
>>>>>>>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow
>>>>>>>>>>>>>>> this ReadAll pattern. RedisIO
>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So
>>>>>>>>>>>>>>> I wanted to bring this subject
>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if
>>>>>>>>>>>>>>> you see any sort of issues that
>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to
>>>>>>>>>>>>>>> start using consistently the
>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read
>>>>>>>>>>>>>>> and the readAll() method for new
>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the
>>>>>>>>>>>>>>> only remaining inconsistent
>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but
>>>>>>>>>>>>>>> apart of this we should be ok).
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO
>>>>>>>>>>>>>>> based on SDF is doing something
>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called
>>>>>>>>>>>>>>> ReadAll and maybe it is worth to be
>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to