Kenn - I don't mean an enum of common closures, I mean expressing closures
in a restricted sub-language such as the language of SQL expressions. That
would only work if there is a portable way to interpret SQL expressions,
but if there isn't, maybe there should be - for the sake of, well,
expressing closures portably. Of course these would be closures that only
work with rows - but that seems powerful enough for many if not most
purposes.
For example, maybe the Java example:
PCollection<BankTransaction> transactions = ...;
transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
.by(Transaction::getType)
.via(tx -> tx.getType().toFields(tx), // Convert the data to be
written to CSVSink
type -> new CSVSink(type.getFieldNames()))
.to(".../path/to/")
.withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
could be written in Python as:
transactions | fileio.write_dynamic(
by="it.type", # "it" is implicitly available in these SQL expressions as
the same thing as the Java lambda argument
format="it.fields",
sink="CSV_SINK(it.type.field_names)", # A bunch of preset sinks
supported in every language?
to=".../path/to/",
naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')")
Again, to be clear, I'm not suggesting to block what Ismael is proposing on
getting this done - getting this done wouldn't be a short term effort, but
seems potentially really nice.
On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <[email protected]> wrote:
> From the Go side of the table, the Go language doesn't provide a mechanism
> to serialize or access closure data, which means DoFns can't be functional
> closures.This combined with the move to have the "Structural DoFns" be
> serialized using Beam Schemas, has the net result that if Go transforms are
> used for Cross Language, they will be configurable with a Schema of the
> configuration data.
>
> Of course, this just means that each language will probably provide
> whichever mechanisms it likes for use of it's cross language transforms.
>
> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <[email protected]> wrote:
>
>> I don't think an enum of most common closures will work. The input types
>> are typically generics that are made concrete by the caller who also
>> provides the closures. I think Luke's (2) is the same idea as my "Java
>> still assembles it [using opaque Python closures/transforms]". It seems
>> like an approach to (3). Passing over actual code could address some cases,
>> but libraries become the issue.
>>
>> I think it is fair to say that "WriteAll" style would involve entering
>> unexplored territory.
>>
>> On the main topic, I think Brian has a pretty strong point and his
>> example of type conversion lambdas is a good example. I did a quick survey
>> and every other property I could find does seem like it fits on the Read,
>> and most IOs have a few of these closures for example also extracting
>> timestamps. So maybe just a resolution convention of putting them on the
>> ReadAll and that taking precedence. Then you would be deserializing a Read
>> transform with insta-crash methods or some such?
>>
>> Kenn
>>
>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> Yeah, mainly I just feel like dynamic reads and dynamic writes (and
>>> perhaps not-yet-invented similar transforms of other kinds) are tightly
>>> related - they are either very similar, or are duals of each other - so
>>> they should use the same approach. If they are using different approaches,
>>> it is a sign that either one of them is being done wrong or that we are
>>> running into a fundamental limitation of Beam (e.g. difficulty of encoding
>>> closures compared to encoding elements).
>>>
>>> But I agree with Luke that we shouldn't give up on closures. Especially
>>> with the work that has been done on schemas and SQL, I see no reason why we
>>> couldn't express closures in a portable restricted sub-language. If we can
>>> express SQL, we can express many or most use cases of dynamic reads/writes
>>> - I don't mean that we should actually use SQL (though we *could* -
>>> e.g. SQL scalar expressions seem powerful enough to express the closures
>>> appearing in most use cases of FileIO.writeDynamic), I just mean that SQL
>>> is an existence proof.
>>>
>>> (I don't want to rock the boat too much, just thought I'd chime in as
>>> this topic is dear to my heart)
>>>
>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> Kenn, I'm not too worried about closures since:
>>>> 1) the expansion service for a transform could have a well set of
>>>> defined closures by name that are returned as serialized objects that don't
>>>> need to be interpretable by the caller
>>>> 2) the language could store serialized functions of another language as
>>>> constants
>>>> 3) generic XLang function support will eventually be needed
>>>> but I do agree that closures do make things difficult to express vs
>>>> data which is why primarily why we should prefer data over closures when
>>>> possible and use closures when expressing it with data would be too
>>>> cumbersome.
>>>>
>>>> Brian, so far the cases that have been migrated have shown that the
>>>> source descriptor and the Read transform are almost the same (some
>>>> parameters that only impact pipeline construction such as coders differ).
>>>>
>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette <[email protected]>
>>>> wrote:
>>>>
>>>>> Sorry for jumping into this late and casting a vote against the
>>>>> consensus... but I think I'd prefer standardizing on a pattern like
>>>>> PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. That
>>>>> approach clearly separates the parameters that are allowed to vary across
>>>>> a
>>>>> ReadAll (the ones defined in KafkaSourceDescriptor) from the parameters
>>>>> that should be constant (other parameters in the Read object, like
>>>>> SerializedFunctions for type conversions, parameters for different
>>>>> operating modes, etc...). I think it's helpful to think of the parameters
>>>>> that are allowed to vary as some "location descriptor", but I imagine IO
>>>>> authors may want other parameters to vary across a ReadAll as well.
>>>>>
>>>>> To me it seems safer to let an IO author "opt-in" to a parameter being
>>>>> dynamic at execution time.
>>>>>
>>>>> Brian
>>>>>
>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I'd like to raise one more time the question of consistency between
>>>>>> dynamic reads and dynamic writes, per my email at the beginning of the
>>>>>> thread.
>>>>>> If the community prefers ReadAll to read from Read, then should
>>>>>> dynamicWrite's write to Write?
>>>>>>
>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> It seems like most of us agree on the idea that ReadAll should read
>>>>>>> from Read. I'm going to update the Kafka ReadAll with the same pattern.
>>>>>>> Thanks for all your help!
>>>>>>>
>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I would also like to suggest that transforms that implement
>>>>>>>>> ReadAll via Read should also provide methods like:
>>>>>>>>>
>>>>>>>>> // Uses the specified values if unspecified in the input element
>>>>>>>>> from the PCollection<Read>.
>>>>>>>>> withDefaults(Read read);
>>>>>>>>> // Uses the specified values regardless of what the input element
>>>>>>>>> from the PCollection<Read> specifies.
>>>>>>>>> withOverrides(Read read);
>>>>>>>>>
>>>>>>>>> and only adds methods that are required at construction time (e.g.
>>>>>>>>> coders). This way the majority of documentation sits on the Read
>>>>>>>>> transform.
>>>>>>>>>
>>>>>>>>
>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here and
>>>>>>>> some of the drawbacks related to cross-language can be overcome through
>>>>>>>> future advancements.
>>>>>>>> Thanks for bringing this up Ismaël.
>>>>>>>>
>>>>>>>> - Cham
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ismael, it is good to hear that using Read as the input didn't
>>>>>>>>>> have a bunch of parameters that were being skipped/ignored. Also,
>>>>>>>>>> for the
>>>>>>>>>> polymorphism issue you have to rely on the user correctly telling
>>>>>>>>>> you the
>>>>>>>>>> type in such a way where it is a common ancestor of all the runtime
>>>>>>>>>> types
>>>>>>>>>> that will ever be used. This usually boils down to something like
>>>>>>>>>> Serializable or DynamicMessage such that the coder that is chosen
>>>>>>>>>> works for
>>>>>>>>>> all the runtime types. Using multiple types is a valid use case and
>>>>>>>>>> would
>>>>>>>>>> allow for a simpler graph with less flattens merging the output from
>>>>>>>>>> multiple sources.
>>>>>>>>>>
>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for
>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the parameters can't
>>>>>>>>>> be
>>>>>>>>>> represented in a meaningful way beyond "bytes". This would be
>>>>>>>>>> helpful for
>>>>>>>>>> cross language as well since every parameter would become available
>>>>>>>>>> if a
>>>>>>>>>> language could support it (e.g. it could serialize a java function
>>>>>>>>>> up front
>>>>>>>>>> and keep it saved as raw bytes within said language). Even if we
>>>>>>>>>> figure out
>>>>>>>>>> a better way to do this in the future, we'll have to change the
>>>>>>>>>> schema for
>>>>>>>>>> the new way anyway. This would mean that the external version of the
>>>>>>>>>> transform adopts Row to Read and we drop KafkaSourceDescriptor. The
>>>>>>>>>> conversion from Row to Read could validate that the parameters make
>>>>>>>>>> sense
>>>>>>>>>> (e.g. the bytes are valid serialized functions). The addition of an
>>>>>>>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well
>>>>>>>>>> and
>>>>>>>>>> this would enable having a bounded version that could be used for
>>>>>>>>>> backfills
>>>>>>>>>> (this doesn't have to be done as part of any current ongoing PR).
>>>>>>>>>> Essentially any parameter that could be added for a single instance
>>>>>>>>>> of a
>>>>>>>>>> Kafka element+restriction would also make sense to the KafkaIO.Read
>>>>>>>>>> transform since it too is a single instance. There are parameters
>>>>>>>>>> that
>>>>>>>>>> would apply to the ReadAll that wouldn't apply to a read and these
>>>>>>>>>> would be
>>>>>>>>>> global parameters across all element+restriction pairs such as config
>>>>>>>>>> overrides or default values.
>>>>>>>>>>
>>>>>>>>>> I am convinced that we should do as Ismael is suggesting and use
>>>>>>>>>> KafkaIO.Read as the type.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Discussion regarding cross-language transforms is a slight
>>>>>>>>>>> tangent here. But I think, in general, it's great if we can use
>>>>>>>>>>> existing
>>>>>>>>>>> transforms (for example, IO connectors) as cross-language transforms
>>>>>>>>>>> without having to build more composites (irrespective of whether in
>>>>>>>>>>> ExternalTransformBuilders or a user pipelines) just to make them
>>>>>>>>>>> cross-language compatible. A future cross-language compatible
>>>>>>>>>>> SchemaCoder
>>>>>>>>>>> might help (assuming that works for Read transform) but I'm not
>>>>>>>>>>> sure we
>>>>>>>>>>> have a good idea when we'll get to that state.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Cham
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the
>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline update
>>>>>>>>>>>> scenario(For
>>>>>>>>>>>> detailed discussion, please refer to
>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>>>>>>>>>>> In order to obtain the compatibility, it requires the input of the
>>>>>>>>>>>> read SDF
>>>>>>>>>>>> is schema-aware.
>>>>>>>>>>>>
>>>>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to
>>>>>>>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be
>>>>>>>>>>>> schema-aware,
>>>>>>>>>>>> otherwise pipeline updates might fail unnecessarily. If looking
>>>>>>>>>>>> into
>>>>>>>>>>>> KafkaIO.Read, not all necessary fields are compatible with schema,
>>>>>>>>>>>> for
>>>>>>>>>>>> example, SerializedFunction.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common
>>>>>>>>>>>> pattern for SDF based IO. The Read can be a common pattern because
>>>>>>>>>>>> the
>>>>>>>>>>>> input is always a PBegin. But for an SDF based IO, the input can be
>>>>>>>>>>>> anything. By using Read as input, we will still have the
>>>>>>>>>>>> maintenance cost
>>>>>>>>>>>> when SDF IO supports a new field but Read doesn't consume it. For
>>>>>>>>>>>> example,
>>>>>>>>>>>> we are discussing adding endOffset and endReadTime to
>>>>>>>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> We forgot to mention (5) External.Config used in cross-lang,
>>>>>>>>>>>>> see KafkaIO
>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the predecessor of
>>>>>>>>>>>>> (4) and probably a
>>>>>>>>>>>>> really good candidate to be replaced by the Row based
>>>>>>>>>>>>> Configuration Boyuan is
>>>>>>>>>>>>> envisioning (so good to be aware of this).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention the real
>>>>>>>>>>>>> issue(s). All the
>>>>>>>>>>>>> approaches discussed so far in the end could be easily
>>>>>>>>>>>>> transformed to produce a
>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be read by the
>>>>>>>>>>>>> generic ReadAll
>>>>>>>>>>>>> transform. Notice that this can be internal in some IOs e.g.
>>>>>>>>>>>>> KafkaIO if they
>>>>>>>>>>>>> decide not to expose it. I am not saying that we should force
>>>>>>>>>>>>> every IO to
>>>>>>>>>>>>> support ReadAll in its public API but if we do it is probably
>>>>>>>>>>>>> a good idea to be
>>>>>>>>>>>>> consistent with naming the transform that expects an input
>>>>>>>>>>>>> PCollection<Read> in
>>>>>>>>>>>>> the same way. Also notice that using it will save us of the
>>>>>>>>>>>>> maintenance issues
>>>>>>>>>>>>> discussed in my previous email.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Back to the main concern: the consequences of expansion based
>>>>>>>>>>>>> on Read: So far I
>>>>>>>>>>>>> have not seen consequences for the Splitting part which maps
>>>>>>>>>>>>> really nice
>>>>>>>>>>>>> assuming the Partition info / Restriction is available as part
>>>>>>>>>>>>> of Read. So far
>>>>>>>>>>>>> there are not Serialization because Beam is already enforcing
>>>>>>>>>>>>> this. Notice that
>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at
>>>>>>>>>>>>> least for the
>>>>>>>>>>>>> Bounded case (see the code in my previous email). For the
>>>>>>>>>>>>> other points:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>>>>>>> example, the
>>>>>>>>>>>>> > Kafka Read implementation allows you to set the key and
>>>>>>>>>>>>> value deserializers
>>>>>>>>>>>>> > which are also used to dictate the output PCollection type.
>>>>>>>>>>>>> It also allows you
>>>>>>>>>>>>> > to set how the watermark should be computed. Technically a
>>>>>>>>>>>>> user may want the
>>>>>>>>>>>>> > watermark computation to be configurable per Read and they
>>>>>>>>>>>>> may also want an
>>>>>>>>>>>>> > output type which is polymorphic (e.g.
>>>>>>>>>>>>> Pcollection<Serializable>).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Most of the times they do but for parametric types we cannot
>>>>>>>>>>>>> support different
>>>>>>>>>>>>> types in the outputs of the Read or at least I did not find
>>>>>>>>>>>>> how to do so (is
>>>>>>>>>>>>> there a way to use multiple output Coders on Beam?), we saw
>>>>>>>>>>>>> this in CassandraIO
>>>>>>>>>>>>> and we were discussing adding explicitly these Coders or
>>>>>>>>>>>>> Serializer
>>>>>>>>>>>>> specific methods to the ReadAll transform. This is less nice
>>>>>>>>>>>>> because it will
>>>>>>>>>>>>> imply some repeated methods, but it is still a compromise to
>>>>>>>>>>>>> gain the other
>>>>>>>>>>>>> advantages. I suppose the watermark case you mention is
>>>>>>>>>>>>> similar because you may
>>>>>>>>>>>>> want the watermark to behave differently in each Read and we
>>>>>>>>>>>>> probably don’t
>>>>>>>>>>>>> support this, so it corresponds to the polymorphic category.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>>>> modelling concerns.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>),
>>>>>>>>>>>>> was it discovered
>>>>>>>>>>>>> > that some properties became runtime errors or were ignored
>>>>>>>>>>>>> if they were set?
>>>>>>>>>>>>> > If no, then the code deduplication is likely worth it
>>>>>>>>>>>>> because we also get a
>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an
>>>>>>>>>>>>> acceptable user
>>>>>>>>>>>>> > experience?
>>>>>>>>>>>>>
>>>>>>>>>>>>> No, not so far. This is an interesting part, notice that the
>>>>>>>>>>>>> Read translation
>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part of ReadAll
>>>>>>>>>>>>> so the ReadFn is
>>>>>>>>>>>>> the real read and must be aware and use all the parameters.
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public PCollection<SolrDocument> expand(PBegin input) {
>>>>>>>>>>>>> return input.apply("Create",
>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll());
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> I might be missing something for the Unbounded SDF case which
>>>>>>>>>>>>> is the only case
>>>>>>>>>>>>> we have not explored so far. I think one easy way to see the
>>>>>>>>>>>>> limitations would
>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to
>>>>>>>>>>>>> map
>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and
>>>>>>>>>>>>> the Read logic on
>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit, the
>>>>>>>>>>>>> polymorphic ones
>>>>>>>>>>>>> will be there for sure, maybe others will appear (not sure).
>>>>>>>>>>>>> However it would be
>>>>>>>>>>>>> interesting to see if we have a real gain in the maintenance
>>>>>>>>>>>>> points, but well
>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so
>>>>>>>>>>>>> probably the generic
>>>>>>>>>>>>> implementation could be relatively complex.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for
>>>>>>>>>>>>> cross language. The difference being that the cross language
>>>>>>>>>>>>> transform
>>>>>>>>>>>>> would take a well known definition and convert it to the Read
>>>>>>>>>>>>> transform. A
>>>>>>>>>>>>> normal user would have a pipeline that would look like:
>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>> PTransform(Convert Row to Read) -> PCollection<Read> ->
>>>>>>>>>>>>> PTransform(ReadAll)
>>>>>>>>>>>>> -> PCollection<Output>
>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>> PTransform(Convert Row to SourceDescriptor) ->
>>>>>>>>>>>>> PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>>>> PCollection<Output>*
>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only
>>>>>>>>>>>>> exists since we haven't solved how to use schemas with language
>>>>>>>>>>>>> bound types
>>>>>>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder
>>>>>>>>>>>>> is which
>>>>>>>>>>>>> is why the conversion step exists. We could have a solution for
>>>>>>>>>>>>> this at
>>>>>>>>>>>>> some point in time.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>>>>>>> example, the Kafka Read implementation allows you to set the key
>>>>>>>>>>>>> and value
>>>>>>>>>>>>> deserializers which are also used to dictate the output
>>>>>>>>>>>>> PCollection type.
>>>>>>>>>>>>> It also allows you to set how the watermark should be computed.
>>>>>>>>>>>>> Technically
>>>>>>>>>>>>> a user may want the watermark computation to be configurable per
>>>>>>>>>>>>> Read and
>>>>>>>>>>>>> they may also want an output type which is polymorphic (e.g.
>>>>>>>>>>>>> PCollection<Serializable>).
>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>>>> modelling concerns.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>),
>>>>>>>>>>>>> was it discovered that some properties became runtime errors or
>>>>>>>>>>>>> were
>>>>>>>>>>>>> ignored if they were set? If no, then the code deduplication is
>>>>>>>>>>>>> likely
>>>>>>>>>>>>> worth it because we also get a lot of javadoc deduplication, but
>>>>>>>>>>>>> if yes is
>>>>>>>>>>>>> this an acceptable user experience?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a
>>>>>>>>>>>>> general "PTransform<PCollection<Read>, PCollection<OutputType>>”
>>>>>>>>>>>>> was to
>>>>>>>>>>>>> reduce the amount of code duplication and error-prone approach
>>>>>>>>>>>>> related to
>>>>>>>>>>>>> this. It makes much sense since usually we have all needed
>>>>>>>>>>>>> configuration
>>>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will
>>>>>>>>>>>>> consist mostly
>>>>>>>>>>>>> of only Split-Shuffle-Read stages. So this case usually can be
>>>>>>>>>>>>> unified by
>>>>>>>>>>>>> using PCollection<Read> as input.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>>>>>>>>>> cross-language transforms (as Luke described) which seems only
>>>>>>>>>>>>> partly in
>>>>>>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be
>>>>>>>>>>>>> more in
>>>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe
>>>>>>>>>>>>> ReadAll is
>>>>>>>>>>>>> not a very suitable name in this case because it will can bring
>>>>>>>>>>>>> some
>>>>>>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and
>>>>>>>>>>>>> (4): use the data type that is schema-aware as the input of
>>>>>>>>>>>>> ReadAll.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type that
>>>>>>>>>>>>> is schema-aware as the input of ReadAll.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with
>>>>>>>>>>>>> SDF-like IO. But only having (3) is not enough to solve the
>>>>>>>>>>>>> problem of
>>>>>>>>>>>>> using ReadAll in x-lang case.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll
>>>>>>>>>>>>> should be able to cross language boundaries and have
>>>>>>>>>>>>> compatibilities of
>>>>>>>>>>>>> updating/downgrading. After investigating some possibilities(pure
>>>>>>>>>>>>> java pojo
>>>>>>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find
>>>>>>>>>>>>> that
>>>>>>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that
>>>>>>>>>>>>> using Read
>>>>>>>>>>>>> as input of ReadAll makes sense in some cases, but I also think
>>>>>>>>>>>>> not all IOs
>>>>>>>>>>>>> have the same need. I would treat Read as a special type as long
>>>>>>>>>>>>> as the
>>>>>>>>>>>>> Read is schema-aware.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> I see. So it seems like there are three options discussed
>>>>>>>>>>>>> so far when it comes to defining source descriptors for ReadAll
>>>>>>>>>>>>> type
>>>>>>>>>>>>> transforms
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>>>>>>>>>> PCollection
>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data
>>>>>>>>>>>>> element of the input PCollection
>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read
>>>>>>>>>>>>> transform which essentially will convert it to a ReadAll (what
>>>>>>>>>>>>> Eugene
>>>>>>>>>>>>> mentioned)
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of
>>>>>>>>>>>>> source descriptions such as files.
>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make
>>>>>>>>>>>>> it hard to use the ReadAll transform as a cross-language
>>>>>>>>>>>>> transform and will
>>>>>>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but
>>>>>>>>>>>>> will make the transform easier to be used as a cross-language
>>>>>>>>>>>>> transform
>>>>>>>>>>>>> without additional modifications
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms
>>>>>>>>>>>>> that are more efficient. So we might be able to just define all
>>>>>>>>>>>>> sources in
>>>>>>>>>>>>> that format and make Read transforms just an easy to use
>>>>>>>>>>>>> composite built on
>>>>>>>>>>>>> top of that (by adding a preceding Create transform).
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>>>> >>>> Cham
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable
>>>>>>>>>>>>> since anonymous DoFns typically capture the enclosing PTransform.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a
>>>>>>>>>>>>> transform, at least here:
>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time
>>>>>>>>>>>>> transforms from execution time data objects that we store in
>>>>>>>>>>>>> PCollections
>>>>>>>>>>>>> as Luke mentioned. Also, we don't guarantee that PTransform is
>>>>>>>>>>>>> serializable
>>>>>>>>>>>>> so users have the additional complexity of providing a corder
>>>>>>>>>>>>> whenever a
>>>>>>>>>>>>> PTransform is used as a data object.
>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects
>>>>>>>>>>>>> that are convertible to Beam Rows allow us to make these
>>>>>>>>>>>>> transforms
>>>>>>>>>>>>> available to other SDKs through the cross-language transforms.
>>>>>>>>>>>>> Using
>>>>>>>>>>>>> transforms or complex sources as data objects will probably make
>>>>>>>>>>>>> this
>>>>>>>>>>>>> difficult.
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the
>>>>>>>>>>>>> IO with SDF implementation despite the type of input, where Read
>>>>>>>>>>>>> refers to
>>>>>>>>>>>>> UnboundedSource. One major pushback of using KafkaIO.Read as
>>>>>>>>>>>>> source
>>>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are
>>>>>>>>>>>>> meaningful
>>>>>>>>>>>>> to populate during execution time. Also when thinking about
>>>>>>>>>>>>> x-lang useage,
>>>>>>>>>>>>> making source description across language boundaries is also
>>>>>>>>>>>>> necessary. As
>>>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an
>>>>>>>>>>>>> AutoValue object:
>>>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware
>>>>>>>>>>>>> object
>>>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's
>>>>>>>>>>>>> also easy to
>>>>>>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable
>>>>>>>>>>>>> parameters
>>>>>>>>>>>>> for reading from Kafka. This is different from the pattern that
>>>>>>>>>>>>> Ismael
>>>>>>>>>>>>> listed because they take PCollection<Read> as input and the Read
>>>>>>>>>>>>> is the
>>>>>>>>>>>>> same as the Read PTransform class used for the non read all case.
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication
>>>>>>>>>>>>> since parameters used to configure the transform have to be
>>>>>>>>>>>>> copied over to
>>>>>>>>>>>>> the source descriptor but decouples how a transform is specified
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's
>>>>>>>>>>>>> point is
>>>>>>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe
>>>>>>>>>>>>> is a non-issue is that the Beam Java SDK has the most IO
>>>>>>>>>>>>> connectors and we
>>>>>>>>>>>>> would want to use the IO implementations within Beam Go and Beam
>>>>>>>>>>>>> Python.
>>>>>>>>>>>>> This brings in its own set of issues related to versioning and
>>>>>>>>>>>>> compatibility for the wire format and how one parameterizes such
>>>>>>>>>>>>> transforms. The wire format issue can be solved with either
>>>>>>>>>>>>> approach by
>>>>>>>>>>>>> making sure that the cross language expansion always takes the
>>>>>>>>>>>>> well known
>>>>>>>>>>>>> format (whatever it may be) and converts it into
>>>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the
>>>>>>>>>>>>> ReadAll
>>>>>>>>>>>>> transform. Boyuan has been looking to make the
>>>>>>>>>>>>> KafkaSourceDescriptor have a
>>>>>>>>>>>>> schema so it can be represented as a row and this can be done
>>>>>>>>>>>>> easily using
>>>>>>>>>>>>> the AutoValue integration (I don't believe there is anything
>>>>>>>>>>>>> preventing
>>>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or also
>>>>>>>>>>>>> using the
>>>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue).
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and
>>>>>>>>>>>>> separation of concerns provided by using a different object to
>>>>>>>>>>>>> represent
>>>>>>>>>>>>> the contents of the PCollection from the pipeline construction
>>>>>>>>>>>>> time
>>>>>>>>>>>>> PTransform.
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense
>>>>>>>>>>>>> also have
>>>>>>>>>>>>> to configure a dynamic number different IO transforms of the same
>>>>>>>>>>>>> type
>>>>>>>>>>>>> (file writes)?
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects
>>>>>>>>>>>>> of many file writes:
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType)
>>>>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), //
>>>>>>>>>>>>> Convert the data to be written to CSVSink
>>>>>>>>>>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/")
>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type +
>>>>>>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from
>>>>>>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>>>>>>> Moo>readAll()
>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for
>>>>>>>>>>>>> this bar...)
>>>>>>>>>>>>> >>>>>>>>> ...etc);
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires
>>>>>>>>>>>>> context)
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn
>>>>>>>>>>>>> based ones. One pattern
>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The
>>>>>>>>>>>>> idea is to have a different
>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a
>>>>>>>>>>>>> PCollection of different sorts of
>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries,
>>>>>>>>>>>>> etc, for example:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>,
>>>>>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing
>>>>>>>>>>>>> multiple queries in the same
>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or
>>>>>>>>>>>>> querying from multiple tables at the
>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll
>>>>>>>>>>>>> transforms the parameters for
>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots of
>>>>>>>>>>>>> duplicated with methods and
>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into
>>>>>>>>>>>>> the ReadAll transforms.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to
>>>>>>>>>>>>> expand the input parameters of the
>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into something that
>>>>>>>>>>>>> resembles the full `Read`
>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to read
>>>>>>>>>>>>> from multiple tables or
>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this was
>>>>>>>>>>>>> not in the intermediate
>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra
>>>>>>>>>>>>> methods (duplicating more code)
>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read full
>>>>>>>>>>>>> spec.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we
>>>>>>>>>>>>> end up adding them
>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so
>>>>>>>>>>>>> they are taken into account.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test
>>>>>>>>>>>>> a new approach that is
>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code
>>>>>>>>>>>>> became:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of
>>>>>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read
>>>>>>>>>>>>> parameters. But of course there are
>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal
>>>>>>>>>>>>> Reads for example
>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or Restriction
>>>>>>>>>>>>> information (in the SDF
>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent approach of
>>>>>>>>>>>>> ReadAll produces a simple
>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable
>>>>>>>>>>>>> between IOs (e.g. the non-SDF
>>>>>>>>>>>>> >>>>>>>>>> case):
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends
>>>>>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>>>> >>>>>>>>>> @Override
>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument>
>>>>>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>>>> >>>>>>>>>> return input
>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle",
>>>>>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>>>> >>>>>>>>>> }
>>>>>>>>>>>>> >>>>>>>>>> }
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>>>>>>> ReadAll you must have the
>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require
>>>>>>>>>>>>> consistent types from the data
>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to add
>>>>>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow
>>>>>>>>>>>>> this ReadAll pattern. RedisIO
>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I
>>>>>>>>>>>>> wanted to bring this subject
>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if
>>>>>>>>>>>>> you see any sort of issues that
>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to
>>>>>>>>>>>>> start using consistently the
>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and
>>>>>>>>>>>>> the readAll() method for new
>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the
>>>>>>>>>>>>> only remaining inconsistent
>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart
>>>>>>>>>>>>> of this we should be ok).
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO
>>>>>>>>>>>>> based on SDF is doing something
>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll
>>>>>>>>>>>>> and maybe it is worth to be
>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for
>>>>>>>>>>>>> cross language. The difference being that the cross language
>>>>>>>>>>>>> transform
>>>>>>>>>>>>> would take a well known definition and convert it to the Read
>>>>>>>>>>>>> transform. A
>>>>>>>>>>>>> normal user would have a pipeline that would look like:
>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) ->
>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>>>> PCollection<Output>
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>> PTransform(Convert Row to Read) -> PCollection<Read> ->
>>>>>>>>>>>>> PTransform(ReadAll)
>>>>>>>>>>>>> -> PCollection<Output>
>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> ->
>>>>>>>>>>>>> PTransform(Convert Row to SourceDescriptor) ->
>>>>>>>>>>>>> PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>>>>>>> PCollection<Output>*
>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only
>>>>>>>>>>>>> exists since we haven't solved how to use schemas with language
>>>>>>>>>>>>> bound types
>>>>>>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder
>>>>>>>>>>>>> is which
>>>>>>>>>>>>> is why the conversion step exists. We could have a solution for
>>>>>>>>>>>>> this at
>>>>>>>>>>>>> some point in time.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>>>>>>> example, the Kafka Read implementation allows you to set the key
>>>>>>>>>>>>> and value
>>>>>>>>>>>>> deserializers which are also used to dictate the output
>>>>>>>>>>>>> PCollection type.
>>>>>>>>>>>>> It also allows you to set how the watermark should be computed.
>>>>>>>>>>>>> Technically
>>>>>>>>>>>>> a user may want the watermark computation to be configurable per
>>>>>>>>>>>>> Read and
>>>>>>>>>>>>> they may also want an output type which is polymorphic (e.g.
>>>>>>>>>>>>> PCollection<Serializable>).
>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object
>>>>>>>>>>>>> modelling concerns.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>),
>>>>>>>>>>>>> was it discovered that some properties became runtime errors or
>>>>>>>>>>>>> were
>>>>>>>>>>>>> ignored if they were set? If no, then the code deduplication is
>>>>>>>>>>>>> likely
>>>>>>>>>>>>> worth it because we also get a lot of javadoc deduplication, but
>>>>>>>>>>>>> if yes is
>>>>>>>>>>>>> this an acceptable user experience?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a
>>>>>>>>>>>>> general "PTransform<PCollection<Read>, PCollection<OutputType>>”
>>>>>>>>>>>>> was to
>>>>>>>>>>>>> reduce the amount of code duplication and error-prone approach
>>>>>>>>>>>>> related to
>>>>>>>>>>>>> this. It makes much sense since usually we have all needed
>>>>>>>>>>>>> configuration
>>>>>>>>>>>>> set in Read objects and, as Ismaeil mentioned, ReadAll will
>>>>>>>>>>>>> consist mostly
>>>>>>>>>>>>> of only Split-Shuffle-Read stages. So this case usually can be
>>>>>>>>>>>>> unified by
>>>>>>>>>>>>> using PCollection<Read> as input.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>>>>>>>>>> cross-language transforms (as Luke described) which seems only
>>>>>>>>>>>>> partly in
>>>>>>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>>>>>>> configuration for all needs but seems it’s not easy and I’d be
>>>>>>>>>>>>> more in
>>>>>>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe
>>>>>>>>>>>>> ReadAll is
>>>>>>>>>>>>> not a very suitable name in this case because it will can bring
>>>>>>>>>>>>> some
>>>>>>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and
>>>>>>>>>>>>> (4): use the data type that is schema-aware as the input of
>>>>>>>>>>>>> ReadAll.
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type that
>>>>>>>>>>>>> is schema-aware as the input of ReadAll.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with
>>>>>>>>>>>>> SDF-like IO. But only having (3) is not enough to solve the
>>>>>>>>>>>>> problem of
>>>>>>>>>>>>> using ReadAll in x-lang case.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll
>>>>>>>>>>>>> should be able to cross language boundaries and have
>>>>>>>>>>>>> compatibilities of
>>>>>>>>>>>>> updating/downgrading. After investigating some possibilities(pure
>>>>>>>>>>>>> java pojo
>>>>>>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find
>>>>>>>>>>>>> that
>>>>>>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that
>>>>>>>>>>>>> using Read
>>>>>>>>>>>>> as input of ReadAll makes sense in some cases, but I also think
>>>>>>>>>>>>> not all IOs
>>>>>>>>>>>>> have the same need. I would treat Read as a special type as long
>>>>>>>>>>>>> as the
>>>>>>>>>>>>> Read is schema-aware.
>>>>>>>>>>>>> >>>
>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> I see. So it seems like there are three options discussed
>>>>>>>>>>>>> so far when it comes to defining source descriptors for ReadAll
>>>>>>>>>>>>> type
>>>>>>>>>>>>> transforms
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>>>>>>>>>> PCollection
>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data
>>>>>>>>>>>>> element of the input PCollection
>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read
>>>>>>>>>>>>> transform which essentially will convert it to a ReadAll (what
>>>>>>>>>>>>> Eugene
>>>>>>>>>>>>> mentioned)
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of
>>>>>>>>>>>>> source descriptions such as files.
>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make
>>>>>>>>>>>>> it hard to use the ReadAll transform as a cross-language
>>>>>>>>>>>>> transform and will
>>>>>>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but
>>>>>>>>>>>>> will make the transform easier to be used as a cross-language
>>>>>>>>>>>>> transform
>>>>>>>>>>>>> without additional modifications
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms
>>>>>>>>>>>>> that are more efficient. So we might be able to just define all
>>>>>>>>>>>>> sources in
>>>>>>>>>>>>> that format and make Read transforms just an easy to use
>>>>>>>>>>>>> composite built on
>>>>>>>>>>>>> top of that (by adding a preceding Create transform).
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>>>> >>>> Cham
>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable
>>>>>>>>>>>>> since anonymous DoFns typically capture the enclosing PTransform.
>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a
>>>>>>>>>>>>> transform, at least here:
>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time
>>>>>>>>>>>>> transforms from execution time data objects that we store in
>>>>>>>>>>>>> PCollections
>>>>>>>>>>>>> as Luke mentioned. Also, we don't guarantee that PTransform is
>>>>>>>>>>>>> serializable
>>>>>>>>>>>>> so users have the additional complexity of providing a corder
>>>>>>>>>>>>> whenever a
>>>>>>>>>>>>> PTransform is used as a data object.
>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects
>>>>>>>>>>>>> that are convertible to Beam Rows allow us to make these
>>>>>>>>>>>>> transforms
>>>>>>>>>>>>> available to other SDKs through the cross-language transforms
>>>>>>>>>>>>> Using
>>>>>>>>>>>>> transforms or complex sources as data objects will probably make
>>>>>>>>>>>>> this
>>>>>>>>>>>>> difficult.
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the
>>>>>>>>>>>>> IO with SDF implementation despite the type of input, where Read
>>>>>>>>>>>>> refers to
>>>>>>>>>>>>> UnboundedSource. One major pushback of using KafkaIO.Read as
>>>>>>>>>>>>> source
>>>>>>>>>>>>> description is that not all configurations of KafkaIO.Read are
>>>>>>>>>>>>> meaningful
>>>>>>>>>>>>> to populate during execution time. Also when thinking about
>>>>>>>>>>>>> x-lang useage,
>>>>>>>>>>>>> making source description across language boundaries is also
>>>>>>>>>>>>> necessary. As
>>>>>>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an
>>>>>>>>>>>>> AutoValue object:
>>>>>>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware
>>>>>>>>>>>>> object
>>>>>>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's
>>>>>>>>>>>>> also easy to
>>>>>>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable
>>>>>>>>>>>>> parameters
>>>>>>>>>>>>> for reading from Kafka. This is different from the pattern that
>>>>>>>>>>>>> Ismael
>>>>>>>>>>>>> listed because they take PCollection<Read> as input and the Read
>>>>>>>>>>>>> is the
>>>>>>>>>>>>> same as the Read PTransform class used for the non read all case.
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication
>>>>>>>>>>>>> since parameters used to configure the transform have to be
>>>>>>>>>>>>> copied over to
>>>>>>>>>>>>> the source descriptor but decouples how a transform is specified
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> object that describes what needs to be done. I believe Ismael's
>>>>>>>>>>>>> point is
>>>>>>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe
>>>>>>>>>>>>> is a non-issue is that the Beam Java SDK has the most IO
>>>>>>>>>>>>> connectors and we
>>>>>>>>>>>>> would want to use the IO implementations within Beam Go and Beam
>>>>>>>>>>>>> Python.
>>>>>>>>>>>>> This brings in its own set of issues related to versioning and
>>>>>>>>>>>>> compatibility for the wire format and how one parameterizes such
>>>>>>>>>>>>> transforms. The wire format issue can be solved with either
>>>>>>>>>>>>> approach by
>>>>>>>>>>>>> making sure that the cross language expansion always takes the
>>>>>>>>>>>>> well known
>>>>>>>>>>>>> format (whatever it may be) and converts it into
>>>>>>>>>>>>> Read/KafkaSourceDescriptor/... object that is then passed to the
>>>>>>>>>>>>> ReadAll
>>>>>>>>>>>>> transform. Boyuan has been looking to make the
>>>>>>>>>>>>> KafkaSourceDescriptor have a
>>>>>>>>>>>>> schema so it can be represented as a row and this can be done
>>>>>>>>>>>>> easily using
>>>>>>>>>>>>> the AutoValue integration (I don't believe there is anything
>>>>>>>>>>>>> preventing
>>>>>>>>>>>>> someone from writing a schema row -> Read -> row adapter or also
>>>>>>>>>>>>> using the
>>>>>>>>>>>>> AutoValue configuration if the transform is also an AutoValue).
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and
>>>>>>>>>>>>> separation of concerns provided by using a different object to
>>>>>>>>>>>>> represent
>>>>>>>>>>>>> the contents of the PCollection from the pipeline construction
>>>>>>>>>>>>> time
>>>>>>>>>>>>> PTransform.
>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense
>>>>>>>>>>>>> also have
>>>>>>>>>>>>> to configure a dynamic number different IO transforms of the same
>>>>>>>>>>>>> type
>>>>>>>>>>>>> (file writes)?
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects
>>>>>>>>>>>>> of many file writes:
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>>>> >>>>>>>>> .by(Transaction::getType)
>>>>>>>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), //
>>>>>>>>>>>>> Convert the data to be written to CSVSink
>>>>>>>>>>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>>>>> >>>>>>>>> .to(".../path/to/")
>>>>>>>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type +
>>>>>>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from
>>>>>>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>>>>>>> Moo>readAll()
>>>>>>>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for
>>>>>>>>>>>>> this bar...)
>>>>>>>>>>>>> >>>>>>>>> ...etc);
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires
>>>>>>>>>>>>> context)
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn
>>>>>>>>>>>>> based ones. One pattern
>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The
>>>>>>>>>>>>> idea is to have a different
>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a
>>>>>>>>>>>>> PCollection of different sorts of
>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries,
>>>>>>>>>>>>> etc, for example:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>,
>>>>>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing
>>>>>>>>>>>>> multiple queries in the same
>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or
>>>>>>>>>>>>> querying from multiple tables at the
>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll
>>>>>>>>>>>>> transforms the parameters for
>>>>>>>>>>>>> >>>>>>>>>> missing information so we ended up with lots of
>>>>>>>>>>>>> duplicated with methods and
>>>>>>>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into
>>>>>>>>>>>>> the ReadAll transforms.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to
>>>>>>>>>>>>> expand the input parameters of the
>>>>>>>>>>>>> >>>>>>>>>> intermediary specification into something that
>>>>>>>>>>>>> resembles the full `Read`
>>>>>>>>>>>>> >>>>>>>>>> definition for example imagine you want to read
>>>>>>>>>>>>> from multiple tables or
>>>>>>>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this was
>>>>>>>>>>>>> not in the intermediate
>>>>>>>>>>>>> >>>>>>>>>> specification you end up adding those extra
>>>>>>>>>>>>> methods (duplicating more code)
>>>>>>>>>>>>> >>>>>>>>>> just o get close to the be like the Read full
>>>>>>>>>>>>> spec.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we
>>>>>>>>>>>>> end up adding them
>>>>>>>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so
>>>>>>>>>>>>> they are taken into account.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test
>>>>>>>>>>>>> a new approach that is
>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code
>>>>>>>>>>>>> became:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of
>>>>>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read
>>>>>>>>>>>>> parameters. But of course there are
>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal
>>>>>>>>>>>>> Reads for example
>>>>>>>>>>>>> >>>>>>>>>> partition boundaries information or Restriction
>>>>>>>>>>>>> information (in the SDF
>>>>>>>>>>>>> >>>>>>>>>> case). Notice that this consistent approach of
>>>>>>>>>>>>> ReadAll produces a simple
>>>>>>>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable
>>>>>>>>>>>>> between IOs (e.g. the non-SDF
>>>>>>>>>>>>> >>>>>>>>>> case):
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> public static class ReadAll extends
>>>>>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>>>> >>>>>>>>>> @Override
>>>>>>>>>>>>> >>>>>>>>>> public PCollection<SolrDocument>
>>>>>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>>>> >>>>>>>>>> return input
>>>>>>>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>>>> >>>>>>>>>> .apply("Reshuffle",
>>>>>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>>>> >>>>>>>>>> }
>>>>>>>>>>>>> >>>>>>>>>> }
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>>>>>>> ReadAll you must have the
>>>>>>>>>>>>> >>>>>>>>>> Coders used in its definition and require
>>>>>>>>>>>>> consistent types from the data
>>>>>>>>>>>>> >>>>>>>>>> sources, in practice this means we need to add
>>>>>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>>>> >>>>>>>>>> ReadAll but not the full specs.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow
>>>>>>>>>>>>> this ReadAll pattern. RedisIO
>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I
>>>>>>>>>>>>> wanted to bring this subject
>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if
>>>>>>>>>>>>> you see any sort of issues that
>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to
>>>>>>>>>>>>> start using consistently the
>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and
>>>>>>>>>>>>> the readAll() method for new
>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the
>>>>>>>>>>>>> only remaining inconsistent
>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart
>>>>>>>>>>>>> of this we should be ok).
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO
>>>>>>>>>>>>> based on SDF is doing something
>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll
>>>>>>>>>>>>> and maybe it is worth to be
>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>>>> >>
>>>>>>>>>>>>> >>
>>>>>>>>>>>>>
>>>>>>>>>>>>>