I don't think an enum of most common closures will work. The input types
are typically generics that are made concrete by the caller who also
provides the closures. I think Luke's (2) is the same idea as my "Java
still assembles it [using opaque Python closures/transforms]". It seems
like an approach to (3). Passing over actual code could address some cases,
but libraries become the issue.

I think it is fair to say that "WriteAll" style would involve entering
unexplored territory.

On the main topic, I think Brian has a pretty strong point and his example
of type conversion lambdas is a good example. I did a quick survey and
every other property I could find does seem like it fits on the Read, and
most IOs have a few of these closures for example also extracting
timestamps. So maybe just a resolution convention of putting them on the
ReadAll and that taking precedence. Then you would be deserializing a Read
transform with insta-crash methods or some such?

Kenn

On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov <[email protected]>
wrote:

> Yeah, mainly I just feel like dynamic reads and dynamic writes (and
> perhaps not-yet-invented similar transforms of other kinds) are tightly
> related - they are either very similar, or are duals of each other - so
> they should use the same approach. If they are using different approaches,
> it is a sign that either one of them is being done wrong or that we are
> running into a fundamental limitation of Beam (e.g. difficulty of encoding
> closures compared to encoding elements).
>
> But I agree with Luke that we shouldn't give up on closures. Especially
> with the work that has been done on schemas and SQL, I see no reason why we
> couldn't express closures in a portable restricted sub-language. If we can
> express SQL, we can express many or most use cases of dynamic reads/writes
> - I don't mean that we should actually use SQL (though we *could* - e.g.
> SQL scalar expressions seem powerful enough to express the closures
> appearing in most use cases of FileIO.writeDynamic), I just mean that SQL
> is an existence proof.
>
> (I don't want to rock the boat too much, just thought I'd chime in as this
> topic is dear to my heart)
>
> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> wrote:
>
>> Kenn, I'm not too worried about closures since:
>> 1) the expansion service for a transform could have a well set of defined
>> closures by name that are returned as serialized objects that don't need to
>> be interpretable by the caller
>> 2) the language could store serialized functions of another language as
>> constants
>> 3) generic XLang function support will eventually be needed
>> but I do agree that closures do make things difficult to express vs data
>> which is why primarily why we should prefer data over closures when
>> possible and use closures when expressing it with data would be too
>> cumbersome.
>>
>> Brian, so far the cases that have been migrated have shown that the
>> source descriptor and the Read transform are almost the same (some
>> parameters that only impact pipeline construction such as coders differ).
>>
>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette <[email protected]>
>> wrote:
>>
>>> Sorry for jumping into this late and casting a vote against the
>>> consensus... but I think I'd prefer standardizing on a pattern like
>>> PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. That
>>> approach clearly separates the parameters that are allowed to vary across a
>>> ReadAll (the ones defined in KafkaSourceDescriptor) from the parameters
>>> that should be constant (other parameters in the Read object, like
>>> SerializedFunctions for type conversions, parameters for different
>>> operating modes, etc...). I think it's helpful to think of the parameters
>>> that are allowed to vary as some "location descriptor", but I imagine IO
>>> authors may want other parameters to vary across a ReadAll as well.
>>>
>>> To me it seems safer to let an IO author "opt-in" to a parameter being
>>> dynamic at execution time.
>>>
>>> Brian
>>>
>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> I'd like to raise one more time the question of consistency between
>>>> dynamic reads and dynamic writes, per my email at the beginning of the
>>>> thread.
>>>> If the community prefers ReadAll to read from Read, then should
>>>> dynamicWrite's write to Write?
>>>>
>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> It seems like most of us agree on the idea that ReadAll should read
>>>>> from Read. I'm going to update the Kafka ReadAll with the same pattern.
>>>>> Thanks for all your help!
>>>>>
>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> I would also like to suggest that transforms that implement ReadAll
>>>>>>> via Read should also provide methods like:
>>>>>>>
>>>>>>> // Uses the specified values if unspecified in the input element
>>>>>>> from the PCollection<Read>.
>>>>>>> withDefaults(Read read);
>>>>>>> // Uses the specified values regardless of what the input element
>>>>>>> from the PCollection<Read> specifies.
>>>>>>> withOverrides(Read read);
>>>>>>>
>>>>>>> and only adds methods that are required at construction time (e.g.
>>>>>>> coders). This way the majority of documentation sits on the Read 
>>>>>>> transform.
>>>>>>>
>>>>>>
>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here and some
>>>>>> of the drawbacks related to cross-language can be overcome through future
>>>>>> advancements.
>>>>>> Thanks for bringing this up Ismaël.
>>>>>>
>>>>>> - Cham
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> wrote:
>>>>>>>
>>>>>>>> Ismael, it is good to hear that using Read as the input didn't have
>>>>>>>> a bunch of parameters that were being skipped/ignored. Also, for the
>>>>>>>> polymorphism issue you have to rely on the user correctly telling you 
>>>>>>>> the
>>>>>>>> type in such a way where it is a common ancestor of all the runtime 
>>>>>>>> types
>>>>>>>> that will ever be used. This usually boils down to something like
>>>>>>>> Serializable or DynamicMessage such that the coder that is chosen 
>>>>>>>> works for
>>>>>>>> all the runtime types. Using multiple types is a valid use case and 
>>>>>>>> would
>>>>>>>> allow for a simpler graph with less flattens merging the output from
>>>>>>>> multiple sources.
>>>>>>>>
>>>>>>>> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read
>>>>>>>> which uses schemas even if some of the parameters can't be represented 
>>>>>>>> in a
>>>>>>>> meaningful way beyond "bytes". This would be helpful for cross 
>>>>>>>> language as
>>>>>>>> well since every parameter would become available if a language could
>>>>>>>> support it (e.g. it could serialize a java function up front and keep 
>>>>>>>> it
>>>>>>>> saved as raw bytes within said language). Even if we figure out a 
>>>>>>>> better
>>>>>>>> way to do this in the future, we'll have to change the schema for the 
>>>>>>>> new
>>>>>>>> way anyway. This would mean that the external version of the transform
>>>>>>>> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion 
>>>>>>>> from
>>>>>>>> Row to Read could validate that the parameters make sense (e.g. the 
>>>>>>>> bytes
>>>>>>>> are valid serialized functions). The addition of an
>>>>>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
>>>>>>>> this would enable having a bounded version that could be used for 
>>>>>>>> backfills
>>>>>>>> (this doesn't have to be done as part of any current ongoing PR).
>>>>>>>> Essentially any parameter that could be added for a single instance of 
>>>>>>>> a
>>>>>>>> Kafka element+restriction would also make sense to the KafkaIO.Read
>>>>>>>> transform since it too is a single instance. There are parameters that
>>>>>>>> would apply to the ReadAll that wouldn't apply to a read and these 
>>>>>>>> would be
>>>>>>>> global parameters across all element+restriction pairs such as config
>>>>>>>> overrides or default values.
>>>>>>>>
>>>>>>>> I am convinced that we should do as Ismael is suggesting and use
>>>>>>>> KafkaIO.Read as the type.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Discussion regarding cross-language transforms is a slight tangent
>>>>>>>>> here. But I think, in general, it's great if we can use existing 
>>>>>>>>> transforms
>>>>>>>>> (for example, IO connectors) as cross-language transforms without 
>>>>>>>>> having to
>>>>>>>>> build more composites (irrespective of whether in 
>>>>>>>>> ExternalTransformBuilders
>>>>>>>>> or a user pipelines) just to make them cross-language compatible. A 
>>>>>>>>> future
>>>>>>>>> cross-language compatible SchemaCoder might help (assuming that works 
>>>>>>>>> for
>>>>>>>>> Read transform) but I'm not sure we have a good idea when we'll get 
>>>>>>>>> to that
>>>>>>>>> state.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> For unbounded SDF in Kafka, we also consider the
>>>>>>>>>> upgrading/downgrading compatibility in the pipeline update 
>>>>>>>>>> scenario(For
>>>>>>>>>> detailed discussion, please refer to
>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>>>>>>>>> In order to obtain the compatibility, it requires the input of the 
>>>>>>>>>> read SDF
>>>>>>>>>> is schema-aware.
>>>>>>>>>>
>>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to
>>>>>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware,
>>>>>>>>>> otherwise pipeline updates might fail unnecessarily. If looking into
>>>>>>>>>> KafkaIO.Read, not all necessary fields are compatible with schema, 
>>>>>>>>>> for
>>>>>>>>>> example, SerializedFunction.
>>>>>>>>>>
>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common
>>>>>>>>>> pattern for SDF based IO. The Read can be a common pattern because 
>>>>>>>>>> the
>>>>>>>>>> input is always a PBegin. But for an SDF based IO, the input can be
>>>>>>>>>> anything. By using Read as input, we will still have the maintenance 
>>>>>>>>>> cost
>>>>>>>>>> when SDF IO supports a new field but Read doesn't consume it. For 
>>>>>>>>>> example,
>>>>>>>>>> we are discussing adding endOffset and endReadTime to
>>>>>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read.
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> We forgot to mention (5) External.Config used in cross-lang, see
>>>>>>>>>>> KafkaIO
>>>>>>>>>>> ExternalTransformBuilder. This approach is the predecessor of
>>>>>>>>>>> (4) and probably a
>>>>>>>>>>> really good candidate to be replaced by the Row based
>>>>>>>>>>> Configuration Boyuan is
>>>>>>>>>>> envisioning (so good to be aware of this).
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the clear explanation Luke you mention the real
>>>>>>>>>>> issue(s). All the
>>>>>>>>>>> approaches discussed so far in the end could be easily
>>>>>>>>>>> transformed to produce a
>>>>>>>>>>> PCollection<Read> and those Read Elements could be read by the
>>>>>>>>>>> generic ReadAll
>>>>>>>>>>> transform. Notice that this can be internal in some IOs e.g.
>>>>>>>>>>> KafkaIO if they
>>>>>>>>>>> decide not to expose it. I am not saying that we should force
>>>>>>>>>>> every IO to
>>>>>>>>>>> support ReadAll in its public API but if we do it is probably a
>>>>>>>>>>> good idea to be
>>>>>>>>>>> consistent with naming the transform that expects an input
>>>>>>>>>>> PCollection<Read> in
>>>>>>>>>>> the same way. Also notice that using it will save us of the
>>>>>>>>>>> maintenance issues
>>>>>>>>>>> discussed in my previous email.
>>>>>>>>>>>
>>>>>>>>>>> Back to the main concern: the consequences of expansion based on
>>>>>>>>>>> Read: So far I
>>>>>>>>>>> have not seen consequences for the Splitting part which maps
>>>>>>>>>>> really nice
>>>>>>>>>>> assuming the Partition info / Restriction is available as part
>>>>>>>>>>> of Read. So far
>>>>>>>>>>> there are not Serialization because Beam is already enforcing
>>>>>>>>>>> this. Notice that
>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at
>>>>>>>>>>> least for the
>>>>>>>>>>> Bounded case (see the code in my previous email). For the other
>>>>>>>>>>> points:
>>>>>>>>>>>
>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>>>>> example, the
>>>>>>>>>>> > Kafka Read implementation allows you to set the key and value
>>>>>>>>>>> deserializers
>>>>>>>>>>> > which are also used to dictate the output PCollection type. It
>>>>>>>>>>> also allows you
>>>>>>>>>>> > to set how the watermark should be computed. Technically a
>>>>>>>>>>> user may want the
>>>>>>>>>>> > watermark computation to be configurable per Read and they may
>>>>>>>>>>> also want an
>>>>>>>>>>> > output type which is polymorphic (e.g.
>>>>>>>>>>> Pcollection<Serializable>).
>>>>>>>>>>>
>>>>>>>>>>> Most of the times they do but for parametric types we cannot
>>>>>>>>>>> support different
>>>>>>>>>>> types in the outputs of the Read or at least I did not find how
>>>>>>>>>>> to do so (is
>>>>>>>>>>> there a way to use multiple output Coders on Beam?), we saw this
>>>>>>>>>>> in CassandraIO
>>>>>>>>>>> and we were discussing adding explicitly these Coders or
>>>>>>>>>>> Serializer
>>>>>>>>>>> specific methods to the ReadAll transform. This is less nice
>>>>>>>>>>> because it will
>>>>>>>>>>> imply some repeated methods, but it is still a compromise to
>>>>>>>>>>> gain the other
>>>>>>>>>>> advantages. I suppose the watermark case you mention is similar
>>>>>>>>>>> because you may
>>>>>>>>>>> want the watermark to behave differently in each Read and we
>>>>>>>>>>> probably don’t
>>>>>>>>>>> support this, so it corresponds to the polymorphic category.
>>>>>>>>>>>
>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>> modelling concerns.
>>>>>>>>>>>
>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was
>>>>>>>>>>> it discovered
>>>>>>>>>>> > that some properties became runtime errors or were ignored if
>>>>>>>>>>> they were set?
>>>>>>>>>>> > If no, then the code deduplication is likely worth it because
>>>>>>>>>>> we also get a
>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an acceptable
>>>>>>>>>>> user
>>>>>>>>>>> > experience?
>>>>>>>>>>>
>>>>>>>>>>> No, not so far. This is an interesting part, notice that the
>>>>>>>>>>> Read translation
>>>>>>>>>>> ends up delegating the read bits to the ReadFn part of ReadAll
>>>>>>>>>>> so the ReadFn is
>>>>>>>>>>> the real read and must be aware and use all the parameters.
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public PCollection<SolrDocument> expand(PBegin input) {
>>>>>>>>>>>       return input.apply("Create",
>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll());
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>> I might be missing something for the Unbounded SDF case which is
>>>>>>>>>>> the only case
>>>>>>>>>>> we have not explored so far. I think one easy way to see the
>>>>>>>>>>> limitations would
>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to map
>>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the
>>>>>>>>>>> Read logic on
>>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit, the
>>>>>>>>>>> polymorphic ones
>>>>>>>>>>> will be there for sure, maybe others will appear (not sure).
>>>>>>>>>>> However it would be
>>>>>>>>>>> interesting to see if we have a real gain in the maintenance
>>>>>>>>>>> points, but well
>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so
>>>>>>>>>>> probably the generic
>>>>>>>>>>> implementation could be relatively complex.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>>>>>>>>> language. The difference being that the cross language transform 
>>>>>>>>>>> would take
>>>>>>>>>>> a well known definition and convert it to the Read transform. A 
>>>>>>>>>>> normal user
>>>>>>>>>>> would have a pipeline that would look like:
>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>> >
>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert
>>>>>>>>>>> Row to Read) -> PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert
>>>>>>>>>>> Row to SourceDescriptor) -> PCollection<SourceDescriptor> ->
>>>>>>>>>>> PTransform(ReadAll) -> PCollection<Output>*
>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only
>>>>>>>>>>> exists since we haven't solved how to use schemas with language 
>>>>>>>>>>> bound types
>>>>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is 
>>>>>>>>>>> which
>>>>>>>>>>> is why the conversion step exists. We could have a solution for 
>>>>>>>>>>> this at
>>>>>>>>>>> some point in time.
>>>>>>>>>>> >
>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>>>>> example, the Kafka Read implementation allows you to set the key 
>>>>>>>>>>> and value
>>>>>>>>>>> deserializers which are also used to dictate the output PCollection 
>>>>>>>>>>> type.
>>>>>>>>>>> It also allows you to set how the watermark should be computed. 
>>>>>>>>>>> Technically
>>>>>>>>>>> a user may want the watermark computation to be configurable per 
>>>>>>>>>>> Read and
>>>>>>>>>>> they may also want an output type which is polymorphic (e.g.
>>>>>>>>>>> PCollection<Serializable>).
>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>> modelling concerns.
>>>>>>>>>>> >
>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was
>>>>>>>>>>> it discovered that some properties became runtime errors or were 
>>>>>>>>>>> ignored if
>>>>>>>>>>> they were set? If no, then the code deduplication is likely worth it
>>>>>>>>>>> because we also get a lot of javadoc deduplication, but if yes is 
>>>>>>>>>>> this an
>>>>>>>>>>> acceptable user experience?
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a
>>>>>>>>>>> general "PTransform<PCollection<Read>, PCollection<OutputType>>” 
>>>>>>>>>>> was to
>>>>>>>>>>> reduce the amount of code duplication and error-prone approach 
>>>>>>>>>>> related to
>>>>>>>>>>> this. It makes much sense since usually we have all needed 
>>>>>>>>>>> configuration
>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will consist 
>>>>>>>>>>> mostly
>>>>>>>>>>> of only Split-Shuffle-Read stages.  So this case usually can be 
>>>>>>>>>>> unified by
>>>>>>>>>>> using PCollection<Read> as input.
>>>>>>>>>>> >>
>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>>>>>>>> cross-language transforms (as Luke described) which seems only 
>>>>>>>>>>> partly in
>>>>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>>>>> >>
>>>>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be more 
>>>>>>>>>>> in
>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe 
>>>>>>>>>>> ReadAll is
>>>>>>>>>>> not a very suitable name in this case because it will can bring some
>>>>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>>>>> >>
>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and
>>>>>>>>>>> (4): use the data type that is schema-aware as the input of ReadAll.
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type that
>>>>>>>>>>> is schema-aware as the input of ReadAll.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like
>>>>>>>>>>> IO. But only having  (3) is not enough to solve the problem of using
>>>>>>>>>>> ReadAll in x-lang case.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll
>>>>>>>>>>> should be able to cross language boundaries and have 
>>>>>>>>>>> compatibilities of
>>>>>>>>>>> updating/downgrading. After investigating some possibilities(pure 
>>>>>>>>>>> java pojo
>>>>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find 
>>>>>>>>>>> that
>>>>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that 
>>>>>>>>>>> using Read
>>>>>>>>>>> as input of ReadAll makes sense in some cases, but I also think not 
>>>>>>>>>>> all IOs
>>>>>>>>>>> have the same need. I would treat Read as a special type as long as 
>>>>>>>>>>> the
>>>>>>>>>>> Read is schema-aware.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> I see. So it seems like there are three options discussed
>>>>>>>>>>> so far when it comes to defining source descriptors for ReadAll type
>>>>>>>>>>> transforms
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>>>>>>>> PCollection
>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data
>>>>>>>>>>> element of the input PCollection
>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read transform
>>>>>>>>>>> which essentially will convert it to a ReadAll (what Eugene 
>>>>>>>>>>> mentioned)
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of
>>>>>>>>>>> source descriptions such as files.
>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it
>>>>>>>>>>> hard to use the ReadAll transform as a cross-language transform and 
>>>>>>>>>>> will
>>>>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but will
>>>>>>>>>>> make the transform easier to be used as a cross-language transform 
>>>>>>>>>>> without
>>>>>>>>>>> additional modifications
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that
>>>>>>>>>>> are more efficient. So we might be able to just define all sources 
>>>>>>>>>>> in that
>>>>>>>>>>> format and make Read transforms just an easy to use composite built 
>>>>>>>>>>> on top
>>>>>>>>>>> of that (by adding a preceding Create transform).
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>> >>>> Cham
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable
>>>>>>>>>>> since anonymous DoFns typically capture the enclosing PTransform.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a
>>>>>>>>>>> transform, at least here:
>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> I'm in favour of separating construction time transforms
>>>>>>>>>>> from execution time data objects that we store in PCollections as 
>>>>>>>>>>> Luke
>>>>>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable 
>>>>>>>>>>> so
>>>>>>>>>>> users have the additional complexity of providing a corder whenever 
>>>>>>>>>>> a
>>>>>>>>>>> PTransform is used as a data object.
>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects
>>>>>>>>>>> that are convertible to Beam Rows allow us to make these transforms
>>>>>>>>>>> available to other SDKs through the cross-language transforms. Using
>>>>>>>>>>> transforms or complex sources as data objects will probably make 
>>>>>>>>>>> this
>>>>>>>>>>> difficult.
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO
>>>>>>>>>>> with SDF implementation despite the type of input, where Read 
>>>>>>>>>>> refers to
>>>>>>>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>>>>>> meaningful
>>>>>>>>>>> to populate during execution time. Also when thinking about x-lang 
>>>>>>>>>>> useage,
>>>>>>>>>>> making source description across language boundaries is also 
>>>>>>>>>>> necessary.  As
>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>>>>>>>> object:
>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware 
>>>>>>>>>>> object
>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also 
>>>>>>>>>>> easy to
>>>>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable 
>>>>>>>>>>> parameters
>>>>>>>>>>> for reading from Kafka. This is different from the pattern that 
>>>>>>>>>>> Ismael
>>>>>>>>>>> listed because they take PCollection<Read> as input and the Read is 
>>>>>>>>>>> the
>>>>>>>>>>> same as the Read PTransform class used for the non read all case.
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication
>>>>>>>>>>> since parameters used to configure the transform have to be copied 
>>>>>>>>>>> over to
>>>>>>>>>>> the source descriptor but decouples how a transform is specified 
>>>>>>>>>>> from the
>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's 
>>>>>>>>>>> point is
>>>>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe
>>>>>>>>>>> is a non-issue is that the Beam Java SDK has the most IO connectors 
>>>>>>>>>>> and we
>>>>>>>>>>> would want to use the IO implementations within Beam Go and Beam 
>>>>>>>>>>> Python.
>>>>>>>>>>> This brings in its own set of issues related to versioning and
>>>>>>>>>>> compatibility for the wire format and how one parameterizes such
>>>>>>>>>>> transforms. The wire format issue can be solved with either 
>>>>>>>>>>> approach by
>>>>>>>>>>> making sure that the cross language expansion always takes the well 
>>>>>>>>>>> known
>>>>>>>>>>> format (whatever it may be) and converts it into
>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the 
>>>>>>>>>>> ReadAll
>>>>>>>>>>> transform. Boyuan has been looking to make the 
>>>>>>>>>>> KafkaSourceDescriptor have a
>>>>>>>>>>> schema so it can be represented as a row and this can be done 
>>>>>>>>>>> easily using
>>>>>>>>>>> the AutoValue integration (I don't believe there is anything 
>>>>>>>>>>> preventing
>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or also 
>>>>>>>>>>> using the
>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue).
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and separation
>>>>>>>>>>> of concerns provided by using a different object to represent the 
>>>>>>>>>>> contents
>>>>>>>>>>> of the PCollection from the pipeline construction time PTransform.
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense 
>>>>>>>>>>> also have
>>>>>>>>>>> to configure a dynamic number different IO transforms of the same 
>>>>>>>>>>> type
>>>>>>>>>>> (file writes)?
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of
>>>>>>>>>>> many file writes:
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert
>>>>>>>>>>> the data to be written to CSVSink
>>>>>>>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from
>>>>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>>>>> Moo>readAll()
>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this
>>>>>>>>>>> bar...)
>>>>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires
>>>>>>>>>>> context)
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn
>>>>>>>>>>> based ones. One pattern
>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The
>>>>>>>>>>> idea is to have a different
>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection
>>>>>>>>>>> of different sorts of
>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries,
>>>>>>>>>>> etc, for example:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>,
>>>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing
>>>>>>>>>>> multiple queries in the same
>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying
>>>>>>>>>>> from multiple tables at the
>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll
>>>>>>>>>>> transforms the parameters for
>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>>>>>>>> duplicated with methods and
>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into the
>>>>>>>>>>> ReadAll transforms.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand
>>>>>>>>>>> the input parameters of the
>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something that
>>>>>>>>>>> resembles the full `Read`
>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read
>>>>>>>>>>> from multiple tables or
>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this was
>>>>>>>>>>> not in the intermediate
>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>>>>>>>>> (duplicating more code)
>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we
>>>>>>>>>>> end up adding them
>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they
>>>>>>>>>>> are taken into account.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a
>>>>>>>>>>> new approach that is
>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code
>>>>>>>>>>> became:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of
>>>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read
>>>>>>>>>>> parameters. But of course there are
>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal
>>>>>>>>>>> Reads for example
>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>>>>>>>>> information (in the SDF
>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of
>>>>>>>>>>> ReadAll produces a simple
>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable between
>>>>>>>>>>> IOs (e.g. the    non-SDF
>>>>>>>>>>> >>>>>>>>>>    case):
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>> >>>>>>>>>>       return input
>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle",
>>>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>> >>>>>>>>>>     }
>>>>>>>>>>> >>>>>>>>>>   }
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>>>>> ReadAll you must have the
>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require
>>>>>>>>>>> consistent types from the data
>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add
>>>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this
>>>>>>>>>>> ReadAll pattern. RedisIO
>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I
>>>>>>>>>>> wanted to bring this subject
>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you
>>>>>>>>>>> see any sort of issues that
>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to
>>>>>>>>>>> start using consistently the
>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and
>>>>>>>>>>> the readAll() method for new
>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>>>>>>>>> remaining inconsistent
>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of
>>>>>>>>>>> this we should be ok).
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based
>>>>>>>>>>> on SDF is doing something
>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll
>>>>>>>>>>> and maybe it is worth to be
>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>>>>>>>>> language. The difference being that the cross language transform 
>>>>>>>>>>> would take
>>>>>>>>>>> a well known definition and convert it to the Read transform. A 
>>>>>>>>>>> normal user
>>>>>>>>>>> would have a pipeline that would look like:
>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>> >
>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert
>>>>>>>>>>> Row to Read) -> PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert
>>>>>>>>>>> Row to SourceDescriptor) -> PCollection<SourceDescriptor> ->
>>>>>>>>>>> PTransform(ReadAll) -> PCollection<Output>*
>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only
>>>>>>>>>>> exists since we haven't solved how to use schemas with language 
>>>>>>>>>>> bound types
>>>>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is 
>>>>>>>>>>> which
>>>>>>>>>>> is why the conversion step exists. We could have a solution for 
>>>>>>>>>>> this at
>>>>>>>>>>> some point in time.
>>>>>>>>>>> >
>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>>>>> example, the Kafka Read implementation allows you to set the key 
>>>>>>>>>>> and value
>>>>>>>>>>> deserializers which are also used to dictate the output PCollection 
>>>>>>>>>>> type.
>>>>>>>>>>> It also allows you to set how the watermark should be computed. 
>>>>>>>>>>> Technically
>>>>>>>>>>> a user may want the watermark computation to be configurable per 
>>>>>>>>>>> Read and
>>>>>>>>>>> they may also want an output type which is polymorphic (e.g.
>>>>>>>>>>> PCollection<Serializable>).
>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>> modelling concerns.
>>>>>>>>>>> >
>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was
>>>>>>>>>>> it discovered that some properties became runtime errors or were 
>>>>>>>>>>> ignored if
>>>>>>>>>>> they were set? If no, then the code deduplication is likely worth it
>>>>>>>>>>> because we also get a lot of javadoc deduplication, but if yes is 
>>>>>>>>>>> this an
>>>>>>>>>>> acceptable user experience?
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a
>>>>>>>>>>> general "PTransform<PCollection<Read>, PCollection<OutputType>>” 
>>>>>>>>>>> was to
>>>>>>>>>>> reduce the amount of code duplication and error-prone approach 
>>>>>>>>>>> related to
>>>>>>>>>>> this. It makes much sense since usually we have all needed 
>>>>>>>>>>> configuration
>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will consist 
>>>>>>>>>>> mostly
>>>>>>>>>>> of only Split-Shuffle-Read stages.  So this case usually can be 
>>>>>>>>>>> unified by
>>>>>>>>>>> using PCollection<Read> as input.
>>>>>>>>>>> >>
>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>>>>>>>> cross-language transforms (as Luke described) which seems only 
>>>>>>>>>>> partly in
>>>>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>>>>> >>
>>>>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be more 
>>>>>>>>>>> in
>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe 
>>>>>>>>>>> ReadAll is
>>>>>>>>>>> not a very suitable name in this case because it will can bring some
>>>>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>>>>> >>
>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and
>>>>>>>>>>> (4): use the data type that is schema-aware as the input of ReadAll.
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type that
>>>>>>>>>>> is schema-aware as the input of ReadAll.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like
>>>>>>>>>>> IO. But only having  (3) is not enough to solve the problem of using
>>>>>>>>>>> ReadAll in x-lang case.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll
>>>>>>>>>>> should be able to cross language boundaries and have 
>>>>>>>>>>> compatibilities of
>>>>>>>>>>> updating/downgrading. After investigating some possibilities(pure 
>>>>>>>>>>> java pojo
>>>>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find 
>>>>>>>>>>> that
>>>>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that 
>>>>>>>>>>> using Read
>>>>>>>>>>> as input of ReadAll makes sense in some cases, but I also think not 
>>>>>>>>>>> all IOs
>>>>>>>>>>> have the same need. I would treat Read as a special type as long as 
>>>>>>>>>>> the
>>>>>>>>>>> Read is schema-aware.
>>>>>>>>>>> >>>
>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> I see. So it seems like there are three options discussed
>>>>>>>>>>> so far when it comes to defining source descriptors for ReadAll type
>>>>>>>>>>> transforms
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>>>>>>>> PCollection
>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data
>>>>>>>>>>> element of the input PCollection
>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read transform
>>>>>>>>>>> which essentially will convert it to a ReadAll (what Eugene 
>>>>>>>>>>> mentioned)
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of
>>>>>>>>>>> source descriptions such as files.
>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it
>>>>>>>>>>> hard to use the ReadAll transform as a cross-language transform and 
>>>>>>>>>>> will
>>>>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but will
>>>>>>>>>>> make the transform easier to be used as a cross-language transform 
>>>>>>>>>>> without
>>>>>>>>>>> additional modifications
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that
>>>>>>>>>>> are more efficient. So we might be able to just define all sources 
>>>>>>>>>>> in that
>>>>>>>>>>> format and make Read transforms just an easy to use composite built 
>>>>>>>>>>> on top
>>>>>>>>>>> of that (by adding a preceding Create transform).
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>> >>>> Cham
>>>>>>>>>>> >>>>
>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable
>>>>>>>>>>> since anonymous DoFns typically capture the enclosing PTransform.
>>>>>>>>>>> >>>>>
>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a
>>>>>>>>>>> transform, at least here:
>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> I'm in favour of separating construction time transforms
>>>>>>>>>>> from execution time data objects that we store in PCollections as 
>>>>>>>>>>> Luke
>>>>>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable 
>>>>>>>>>>> so
>>>>>>>>>>> users have the additional complexity of providing a corder whenever 
>>>>>>>>>>> a
>>>>>>>>>>> PTransform is used as a data object.
>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects
>>>>>>>>>>> that are convertible to Beam Rows allow us to make these transforms
>>>>>>>>>>> available to other SDKs through the cross-language transforms. Using
>>>>>>>>>>> transforms or complex sources as data objects will probably make 
>>>>>>>>>>> this
>>>>>>>>>>> difficult.
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>>
>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO
>>>>>>>>>>> with SDF implementation despite the type of input, where Read 
>>>>>>>>>>> refers to
>>>>>>>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>>>>>> meaningful
>>>>>>>>>>> to populate during execution time. Also when thinking about x-lang 
>>>>>>>>>>> useage,
>>>>>>>>>>> making source description across language boundaries is also 
>>>>>>>>>>> necessary.  As
>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>>>>>>>> object:
>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware 
>>>>>>>>>>> object
>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also 
>>>>>>>>>>> easy to
>>>>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>>
>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable 
>>>>>>>>>>> parameters
>>>>>>>>>>> for reading from Kafka. This is different from the pattern that 
>>>>>>>>>>> Ismael
>>>>>>>>>>> listed because they take PCollection<Read> as input and the Read is 
>>>>>>>>>>> the
>>>>>>>>>>> same as the Read PTransform class used for the non read all case.
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication
>>>>>>>>>>> since parameters used to configure the transform have to be copied 
>>>>>>>>>>> over to
>>>>>>>>>>> the source descriptor but decouples how a transform is specified 
>>>>>>>>>>> from the
>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's 
>>>>>>>>>>> point is
>>>>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe
>>>>>>>>>>> is a non-issue is that the Beam Java SDK has the most IO connectors 
>>>>>>>>>>> and we
>>>>>>>>>>> would want to use the IO implementations within Beam Go and Beam 
>>>>>>>>>>> Python.
>>>>>>>>>>> This brings in its own set of issues related to versioning and
>>>>>>>>>>> compatibility for the wire format and how one parameterizes such
>>>>>>>>>>> transforms. The wire format issue can be solved with either 
>>>>>>>>>>> approach by
>>>>>>>>>>> making sure that the cross language expansion always takes the well 
>>>>>>>>>>> known
>>>>>>>>>>> format (whatever it may be) and converts it into
>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the 
>>>>>>>>>>> ReadAll
>>>>>>>>>>> transform. Boyuan has been looking to make the 
>>>>>>>>>>> KafkaSourceDescriptor have a
>>>>>>>>>>> schema so it can be represented as a row and this can be done 
>>>>>>>>>>> easily using
>>>>>>>>>>> the AutoValue integration (I don't believe there is anything 
>>>>>>>>>>> preventing
>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or also 
>>>>>>>>>>> using the
>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue).
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and separation
>>>>>>>>>>> of concerns provided by using a different object to represent the 
>>>>>>>>>>> contents
>>>>>>>>>>> of the PCollection from the pipeline construction time PTransform.
>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense 
>>>>>>>>>>> also have
>>>>>>>>>>> to configure a dynamic number different IO transforms of the same 
>>>>>>>>>>> type
>>>>>>>>>>> (file writes)?
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of
>>>>>>>>>>> many file writes:
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert
>>>>>>>>>>> the data to be written to CSVSink
>>>>>>>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from
>>>>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>>>>> Moo>readAll()
>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this
>>>>>>>>>>> bar...)
>>>>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires
>>>>>>>>>>> context)
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn
>>>>>>>>>>> based ones. One pattern
>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The
>>>>>>>>>>> idea is to have a different
>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection
>>>>>>>>>>> of different sorts of
>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries,
>>>>>>>>>>> etc, for example:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>,
>>>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing
>>>>>>>>>>> multiple queries in the same
>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying
>>>>>>>>>>> from multiple tables at the
>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll
>>>>>>>>>>> transforms the parameters for
>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>>>>>>>> duplicated with methods and
>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into the
>>>>>>>>>>> ReadAll transforms.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand
>>>>>>>>>>> the input parameters of the
>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something that
>>>>>>>>>>> resembles the full `Read`
>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read
>>>>>>>>>>> from multiple tables or
>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this was
>>>>>>>>>>> not in the intermediate
>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>>>>>>>>> (duplicating more code)
>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we
>>>>>>>>>>> end up adding them
>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they
>>>>>>>>>>> are taken into account.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a
>>>>>>>>>>> new approach that is
>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code
>>>>>>>>>>> became:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of
>>>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read
>>>>>>>>>>> parameters. But of course there are
>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal
>>>>>>>>>>> Reads for example
>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>>>>>>>>> information (in the SDF
>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of
>>>>>>>>>>> ReadAll produces a simple
>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable between
>>>>>>>>>>> IOs (e.g. the    non-SDF
>>>>>>>>>>> >>>>>>>>>>    case):
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>> >>>>>>>>>>       return input
>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle",
>>>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>> >>>>>>>>>>     }
>>>>>>>>>>> >>>>>>>>>>   }
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>>>>> ReadAll you must have the
>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require
>>>>>>>>>>> consistent types from the data
>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add
>>>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this
>>>>>>>>>>> ReadAll pattern. RedisIO
>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I
>>>>>>>>>>> wanted to bring this subject
>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you
>>>>>>>>>>> see any sort of issues that
>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to
>>>>>>>>>>> start using consistently the
>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and
>>>>>>>>>>> the readAll() method for new
>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>>>>>>>>> remaining inconsistent
>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of
>>>>>>>>>>> this we should be ok).
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based
>>>>>>>>>>> on SDF is doing something
>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll
>>>>>>>>>>> and maybe it is worth to be
>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to