On Tue, Jul 7, 2020 at 2:06 PM Luke Cwik <lc...@google.com> wrote:
>
> Robert, the intent is that the Read object would use a schema coder and for 
> XLang purposes would be no different then a POJO.

Just to clarify, you're saying that the Read PTransform would be
encoded via the schema coder? That still feels a bit odd (and
specificically if we were designing IO from scratch rather than
adapting to what already exists would we choose to use PTransforms as
elements?) but would solve the cross language issue.

> The issue of how to deal with closures applies to both equally and that is 
> why I suggested to favor using data over closures. Once there is an 
> implementation for how to deal with UDFs in an XLang world, this guidance can 
> change.
>
> Kenn, I did mean specifying an enum that the XLang expansion service would 
> return a serialized blob of code. The XLang expansion service is responsible 
> for returning an environment that contains all the necessary dependencies to 
> execute the transforms and the serialized blob of code and hence would be a 
> non-issue for the caller.
>
> From reviewing the SDF Kafka PR, the reduction in maintenance is definitely 
> there (100s of lines of duplicated boilerplate and documentation).
>
> What are the next steps to get a resolution on this?
>
> On Thu, Jul 2, 2020 at 10:38 AM Robert Bradshaw <rober...@google.com> wrote:
>>
>> On Thu, Jul 2, 2020 at 10:26 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>
>>> On Wed, Jul 1, 2020 at 4:17 PM Eugene Kirpichov <kirpic...@google.com> 
>>> wrote:
>>>>
>>>> Kenn - I don't mean an enum of common closures, I mean expressing closures 
>>>> in a restricted sub-language such as the language of SQL expressions.
>>>
>>>
>>> My lack of clarity: enums was my phrasing of Luke's item 1). I understood 
>>> what you meant. I think either a set of well-known closures or a tiny 
>>> sublanguage could add value.
>>>
>>>>
>>>> That would only work if there is a portable way to interpret SQL 
>>>> expressions, but if there isn't, maybe there should be - for the sake of, 
>>>> well, expressing closures portably. Of course these would be closures that 
>>>> only work with rows - but that seems powerful enough for many if not most 
>>>> purposes.
>>>
>>>
>>> You can choose a SQL dialect or choose the tiniest subset just for this 
>>> purpose and go with it. But when the data type going in or out of the 
>>> lambda are e.g. some Java or Python object then what? One idea is to always 
>>> require these to be rows. But if you can really get away with a 
>>> dependency-free context-free lambda, then Javascript or Python is as doable 
>>> as SQL in terms of having a tiny restricted language for just this purpose. 
>>> I would expect once it got used, folks would start to ask to include the 
>>> rest of what the language has to offer - its ecosystem. This is always the 
>>> main design point I am interested in for "lightweight" embedded UDF 
>>> proposals.
>>
>>
>> This is getting off the topic of ReadAll, but I think being able to do 
>> arbitrary computation in preceding/succeeding transform plus a (quite) 
>> restricted language in the transform itself can go a long way. (For example, 
>> one could have a dynamic destinations write that takes a KV<element, dest> 
>> where dest is a format string like "foo-{{shard_num}}.txt" to plug in the 
>> truly dynamic pieces, but the dest string itself can be computed (e.g. based 
>> on the element) using arbitrary code in the caller language.)
>>
>>>
>>>
>>> Kenn
>>>
>>>>
>>>> For example, maybe the Java example:
>>>>
>>>>  PCollection<BankTransaction> transactions = ...;
>>>>  transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>>>      .by(Transaction::getType)
>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be 
>>>> written to CSVSink
>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>      .to(".../path/to/")
>>>>      .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>>>>
>>>> could be written in Python as:
>>>>
>>>> transactions | fileio.write_dynamic(
>>>>   by="it.type",  # "it" is implicitly available in these SQL expressions 
>>>> as the same thing as the Java lambda argument
>>>>   format="it.fields",
>>>>   sink="CSV_SINK(it.type.field_names)",  # A bunch of preset sinks 
>>>> supported in every language?
>>>>   to=".../path/to/",
>>>>   naming="DEFAULT_NAMING(CONCAT(it, '-transactions'), '.csv')")
>>>>
>>>> Again, to be clear, I'm not suggesting to block what Ismael is proposing 
>>>> on getting this done - getting this done wouldn't be a short term effort, 
>>>> but seems potentially really nice.
>>>>
>>>>
>>>> On Wed, Jul 1, 2020 at 3:19 PM Robert Burke <rob...@frantil.com> wrote:
>>>>>
>>>>> From the Go side of the table, the Go language doesn't provide a 
>>>>> mechanism to serialize or access closure data, which means DoFns can't be 
>>>>> functional closures.This combined with the move to have the "Structural 
>>>>> DoFns" be serialized using Beam Schemas, has the net result that if Go 
>>>>> transforms are used for Cross Language, they will be configurable with a 
>>>>> Schema of the configuration data.
>>>>>
>>>>> Of course, this just means that each language will probably provide 
>>>>> whichever mechanisms it likes for use of it's cross language transforms.
>>>>>
>>>>> On Tue, 30 Jun 2020 at 16:07, Kenneth Knowles <k...@apache.org> wrote:
>>>>>>
>>>>>> I don't think an enum of most common closures will work. The input types 
>>>>>> are typically generics that are made concrete by the caller who also 
>>>>>> provides the closures. I think Luke's (2) is the same idea as my "Java 
>>>>>> still assembles it [using opaque Python closures/transforms]". It seems 
>>>>>> like an approach to (3). Passing over actual code could address some 
>>>>>> cases, but libraries become the issue.
>>>>>>
>>>>>> I think it is fair to say that "WriteAll" style would involve entering 
>>>>>> unexplored territory.
>>>>>>
>>>>>> On the main topic, I think Brian has a pretty strong point and his 
>>>>>> example of type conversion lambdas is a good example. I did a quick 
>>>>>> survey and every other property I could find does seem like it fits on 
>>>>>> the Read, and most IOs have a few of these closures for example also 
>>>>>> extracting timestamps. So maybe just a resolution convention of putting 
>>>>>> them on the ReadAll and that taking precedence. Then you would be 
>>>>>> deserializing a Read transform with insta-crash methods or some such?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Tue, Jun 30, 2020 at 10:24 AM Eugene Kirpichov <kirpic...@google.com> 
>>>>>> wrote:
>>>>>>>
>>>>>>> Yeah, mainly I just feel like dynamic reads and dynamic writes (and 
>>>>>>> perhaps not-yet-invented similar transforms of other kinds) are tightly 
>>>>>>> related - they are either very similar, or are duals of each other - so 
>>>>>>> they should use the same approach. If they are using different 
>>>>>>> approaches, it is a sign that either one of them is being done wrong or 
>>>>>>> that we are running into a fundamental limitation of Beam (e.g. 
>>>>>>> difficulty of encoding closures compared to encoding elements).
>>>>>>>
>>>>>>> But I agree with Luke that we shouldn't give up on closures. Especially 
>>>>>>> with the work that has been done on schemas and SQL, I see no reason 
>>>>>>> why we couldn't express closures in a portable restricted sub-language. 
>>>>>>> If we can express SQL, we can express many or most use cases of dynamic 
>>>>>>> reads/writes - I don't mean that we should actually use SQL (though we 
>>>>>>> could - e.g. SQL scalar expressions seem powerful enough to express the 
>>>>>>> closures appearing in most use cases of FileIO.writeDynamic), I just 
>>>>>>> mean that SQL is an existence proof.
>>>>>>>
>>>>>>> (I don't want to rock the boat too much, just thought I'd chime in as 
>>>>>>> this topic is dear to my heart)
>>>>>>>
>>>>>>> On Tue, Jun 30, 2020 at 9:59 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>> Kenn, I'm not too worried about closures since:
>>>>>>>> 1) the expansion service for a transform could have a well set of 
>>>>>>>> defined closures by name that are returned as serialized objects that 
>>>>>>>> don't need to be interpretable by the caller
>>>>>>>> 2) the language could store serialized functions of another language 
>>>>>>>> as constants
>>>>>>>> 3) generic XLang function support will eventually be needed
>>>>>>>> but I do agree that closures do make things difficult to express vs 
>>>>>>>> data which is why primarily why we should prefer data over closures 
>>>>>>>> when possible and use closures when expressing it with data would be 
>>>>>>>> too cumbersome.
>>>>>>>>
>>>>>>>> Brian, so far the cases that have been migrated have shown that the 
>>>>>>>> source descriptor and the Read transform are almost the same (some 
>>>>>>>> parameters that only impact pipeline construction such as coders 
>>>>>>>> differ).
>>>>>>>>
>>>>>>>> On Mon, Jun 29, 2020 at 2:33 PM Brian Hulette <bhule...@google.com> 
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Sorry for jumping into this late and casting a vote against the 
>>>>>>>>> consensus... but I think I'd prefer standardizing on a pattern like 
>>>>>>>>> PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. 
>>>>>>>>> That approach clearly separates the parameters that are allowed to 
>>>>>>>>> vary across a ReadAll (the ones defined in KafkaSourceDescriptor) 
>>>>>>>>> from the parameters that should be constant (other parameters in the 
>>>>>>>>> Read object, like SerializedFunctions for type conversions, 
>>>>>>>>> parameters for different operating modes, etc...). I think it's 
>>>>>>>>> helpful to think of the parameters that are allowed to vary as some 
>>>>>>>>> "location descriptor", but I imagine IO authors may want other 
>>>>>>>>> parameters to vary across a ReadAll as well.
>>>>>>>>>
>>>>>>>>> To me it seems safer to let an IO author "opt-in" to a parameter 
>>>>>>>>> being dynamic at execution time.
>>>>>>>>>
>>>>>>>>> Brian
>>>>>>>>>
>>>>>>>>> On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov 
>>>>>>>>> <kirpic...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>> I'd like to raise one more time the question of consistency between 
>>>>>>>>>> dynamic reads and dynamic writes, per my email at the beginning of 
>>>>>>>>>> the thread.
>>>>>>>>>> If the community prefers ReadAll to read from Read, then should 
>>>>>>>>>> dynamicWrite's write to Write?
>>>>>>>>>>
>>>>>>>>>> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <boyu...@google.com> 
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> It seems like most of us agree on the idea that ReadAll should read 
>>>>>>>>>>> from Read. I'm going to update the Kafka ReadAll with the same 
>>>>>>>>>>> pattern.
>>>>>>>>>>> Thanks for all your help!
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath 
>>>>>>>>>>> <chamik...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <lc...@google.com> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would also like to suggest that transforms that implement 
>>>>>>>>>>>>> ReadAll via Read should also provide methods like:
>>>>>>>>>>>>>
>>>>>>>>>>>>> // Uses the specified values if unspecified in the input element 
>>>>>>>>>>>>> from the PCollection<Read>.
>>>>>>>>>>>>> withDefaults(Read read);
>>>>>>>>>>>>> // Uses the specified values regardless of what the input element 
>>>>>>>>>>>>> from the PCollection<Read> specifies.
>>>>>>>>>>>>> withOverrides(Read read);
>>>>>>>>>>>>>
>>>>>>>>>>>>> and only adds methods that are required at construction time 
>>>>>>>>>>>>> (e.g. coders). This way the majority of documentation sits on the 
>>>>>>>>>>>>> Read transform.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> +0 from me. Sounds like benefits outweigh the drawbacks here and 
>>>>>>>>>>>> some of the drawbacks related to cross-language can be overcome 
>>>>>>>>>>>> through future advancements.
>>>>>>>>>>>> Thanks for bringing this up Ismaël.
>>>>>>>>>>>>
>>>>>>>>>>>> - Cham
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <lc...@google.com> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ismael, it is good to hear that using Read as the input didn't 
>>>>>>>>>>>>>> have a bunch of parameters that were being skipped/ignored. 
>>>>>>>>>>>>>> Also, for the polymorphism issue you have to rely on the user 
>>>>>>>>>>>>>> correctly telling you the type in such a way where it is a 
>>>>>>>>>>>>>> common ancestor of all the runtime types that will ever be used. 
>>>>>>>>>>>>>> This usually boils down to something like Serializable or 
>>>>>>>>>>>>>> DynamicMessage such that the coder that is chosen works for all 
>>>>>>>>>>>>>> the runtime types. Using multiple types is a valid use case and 
>>>>>>>>>>>>>> would allow for a simpler graph with less flattens merging the 
>>>>>>>>>>>>>> output from multiple sources.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Boyuan, as you have mentioned we can have a coder for 
>>>>>>>>>>>>>> KafkaIO.Read which uses schemas even if some of the parameters 
>>>>>>>>>>>>>> can't be represented in a meaningful way beyond "bytes". This 
>>>>>>>>>>>>>> would be helpful for cross language as well since every 
>>>>>>>>>>>>>> parameter would become available if a language could support it 
>>>>>>>>>>>>>> (e.g. it could serialize a java function up front and keep it 
>>>>>>>>>>>>>> saved as raw bytes within said language). Even if we figure out 
>>>>>>>>>>>>>> a better way to do this in the future, we'll have to change the 
>>>>>>>>>>>>>> schema for the new way anyway. This would mean that the external 
>>>>>>>>>>>>>> version of the transform adopts Row to Read and we drop 
>>>>>>>>>>>>>> KafkaSourceDescriptor. The conversion from Row to Read could 
>>>>>>>>>>>>>> validate that the parameters make sense (e.g. the bytes are 
>>>>>>>>>>>>>> valid serialized functions). The addition of an 
>>>>>>>>>>>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as 
>>>>>>>>>>>>>> well and this would enable having a bounded version that could 
>>>>>>>>>>>>>> be used for backfills (this doesn't have to be done as part of 
>>>>>>>>>>>>>> any current ongoing PR). Essentially any parameter that could be 
>>>>>>>>>>>>>> added for a single instance of a Kafka element+restriction would 
>>>>>>>>>>>>>> also make sense to the KafkaIO.Read transform since it too is a 
>>>>>>>>>>>>>> single instance. There are parameters that would apply to the 
>>>>>>>>>>>>>> ReadAll that wouldn't apply to a read and these would be global 
>>>>>>>>>>>>>> parameters across all element+restriction pairs such as config 
>>>>>>>>>>>>>> overrides or default values.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am convinced that we should do as Ismael is suggesting and use 
>>>>>>>>>>>>>> KafkaIO.Read as the type.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath 
>>>>>>>>>>>>>> <chamik...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Discussion regarding cross-language transforms is a slight 
>>>>>>>>>>>>>>> tangent here. But I think, in general, it's great if we can use 
>>>>>>>>>>>>>>> existing transforms (for example, IO connectors) as 
>>>>>>>>>>>>>>> cross-language transforms without having to build more 
>>>>>>>>>>>>>>> composites (irrespective of whether in 
>>>>>>>>>>>>>>> ExternalTransformBuilders or a user pipelines) just to make 
>>>>>>>>>>>>>>> them cross-language compatible. A future cross-language 
>>>>>>>>>>>>>>> compatible SchemaCoder might help (assuming that works for Read 
>>>>>>>>>>>>>>> transform) but I'm not sure we have a good idea when we'll get 
>>>>>>>>>>>>>>> to that state.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang 
>>>>>>>>>>>>>>> <boyu...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For unbounded SDF in Kafka, we also consider the 
>>>>>>>>>>>>>>>> upgrading/downgrading compatibility in the pipeline update 
>>>>>>>>>>>>>>>> scenario(For detailed discussion, please refer to 
>>>>>>>>>>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>>>>>>>>>>>>>>>  In order to obtain the compatibility, it requires the input 
>>>>>>>>>>>>>>>> of the read SDF is schema-aware.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to 
>>>>>>>>>>>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be 
>>>>>>>>>>>>>>>> schema-aware, otherwise pipeline updates might fail 
>>>>>>>>>>>>>>>> unnecessarily. If looking into KafkaIO.Read, not all necessary 
>>>>>>>>>>>>>>>> fields are compatible with schema, for example, 
>>>>>>>>>>>>>>>> SerializedFunction.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common 
>>>>>>>>>>>>>>>> pattern for SDF based IO. The Read can be a common pattern 
>>>>>>>>>>>>>>>> because the input is always a PBegin. But for an SDF based IO, 
>>>>>>>>>>>>>>>> the input can be anything. By using Read as input, we will 
>>>>>>>>>>>>>>>> still have the maintenance cost when SDF IO supports a new 
>>>>>>>>>>>>>>>> field but Read doesn't consume it. For example, we are 
>>>>>>>>>>>>>>>> discussing adding endOffset and endReadTime to 
>>>>>>>>>>>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía 
>>>>>>>>>>>>>>>> <ieme...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We forgot to mention (5) External.Config used in cross-lang, 
>>>>>>>>>>>>>>>>> see KafkaIO
>>>>>>>>>>>>>>>>> ExternalTransformBuilder. This approach is the predecessor of 
>>>>>>>>>>>>>>>>> (4) and probably a
>>>>>>>>>>>>>>>>> really good candidate to be replaced by the Row based 
>>>>>>>>>>>>>>>>> Configuration Boyuan is
>>>>>>>>>>>>>>>>> envisioning (so good to be aware of this).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the clear explanation Luke you mention the real 
>>>>>>>>>>>>>>>>> issue(s). All the
>>>>>>>>>>>>>>>>> approaches discussed so far in the end could be easily 
>>>>>>>>>>>>>>>>> transformed to produce a
>>>>>>>>>>>>>>>>> PCollection<Read> and those Read Elements could be read by 
>>>>>>>>>>>>>>>>> the generic ReadAll
>>>>>>>>>>>>>>>>> transform. Notice that this can be internal in some IOs e.g. 
>>>>>>>>>>>>>>>>> KafkaIO if they
>>>>>>>>>>>>>>>>> decide not to expose it. I am not saying that we should force 
>>>>>>>>>>>>>>>>> every IO to
>>>>>>>>>>>>>>>>> support ReadAll in its public API but if we do it is probably 
>>>>>>>>>>>>>>>>> a good idea to be
>>>>>>>>>>>>>>>>> consistent with naming the transform that expects an input 
>>>>>>>>>>>>>>>>> PCollection<Read> in
>>>>>>>>>>>>>>>>> the same way. Also notice that using it will save us of the 
>>>>>>>>>>>>>>>>> maintenance issues
>>>>>>>>>>>>>>>>> discussed in my previous email.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Back to the main concern: the consequences of expansion based 
>>>>>>>>>>>>>>>>> on Read: So far I
>>>>>>>>>>>>>>>>> have not seen consequences for the Splitting part which maps 
>>>>>>>>>>>>>>>>> really nice
>>>>>>>>>>>>>>>>> assuming the Partition info / Restriction is available as 
>>>>>>>>>>>>>>>>> part of Read. So far
>>>>>>>>>>>>>>>>> there are not Serialization because Beam is already enforcing 
>>>>>>>>>>>>>>>>> this. Notice that
>>>>>>>>>>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at 
>>>>>>>>>>>>>>>>> least for the
>>>>>>>>>>>>>>>>> Bounded case (see the code in my previous email). For the 
>>>>>>>>>>>>>>>>> other points:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? 
>>>>>>>>>>>>>>>>> > For example, the
>>>>>>>>>>>>>>>>> > Kafka Read implementation allows you to set the key and 
>>>>>>>>>>>>>>>>> > value deserializers
>>>>>>>>>>>>>>>>> > which are also used to dictate the output PCollection type. 
>>>>>>>>>>>>>>>>> > It also allows you
>>>>>>>>>>>>>>>>> > to set how the watermark should be computed. Technically a 
>>>>>>>>>>>>>>>>> > user may want the
>>>>>>>>>>>>>>>>> > watermark computation to be configurable per Read and they 
>>>>>>>>>>>>>>>>> > may also want an
>>>>>>>>>>>>>>>>> > output type which is polymorphic (e.g. 
>>>>>>>>>>>>>>>>> > Pcollection<Serializable>).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Most of the times they do but for parametric types we cannot 
>>>>>>>>>>>>>>>>> support different
>>>>>>>>>>>>>>>>> types in the outputs of the Read or at least I did not find 
>>>>>>>>>>>>>>>>> how to do so (is
>>>>>>>>>>>>>>>>> there a way to use multiple output Coders on Beam?), we saw 
>>>>>>>>>>>>>>>>> this in CassandraIO
>>>>>>>>>>>>>>>>> and we were discussing adding explicitly these Coders or 
>>>>>>>>>>>>>>>>> Serializer
>>>>>>>>>>>>>>>>> specific methods to the ReadAll transform. This is less nice 
>>>>>>>>>>>>>>>>> because it will
>>>>>>>>>>>>>>>>> imply some repeated methods, but it is still a compromise to 
>>>>>>>>>>>>>>>>> gain the other
>>>>>>>>>>>>>>>>> advantages. I suppose the watermark case you mention is 
>>>>>>>>>>>>>>>>> similar because you may
>>>>>>>>>>>>>>>>> want the watermark to behave differently in each Read and we 
>>>>>>>>>>>>>>>>> probably don’t
>>>>>>>>>>>>>>>>> support this, so it corresponds to the polymorphic category.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object 
>>>>>>>>>>>>>>>>> > modelling concerns.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), 
>>>>>>>>>>>>>>>>> > was it discovered
>>>>>>>>>>>>>>>>> > that some properties became runtime errors or were ignored 
>>>>>>>>>>>>>>>>> > if they were set?
>>>>>>>>>>>>>>>>> > If no, then the code deduplication is likely worth it 
>>>>>>>>>>>>>>>>> > because we also get a
>>>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an 
>>>>>>>>>>>>>>>>> > acceptable user
>>>>>>>>>>>>>>>>> > experience?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> No, not so far. This is an interesting part, notice that the 
>>>>>>>>>>>>>>>>> Read translation
>>>>>>>>>>>>>>>>> ends up delegating the read bits to the ReadFn part of 
>>>>>>>>>>>>>>>>> ReadAll so the ReadFn is
>>>>>>>>>>>>>>>>> the real read and must be aware and use all the parameters.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>     public PCollection<SolrDocument> expand(PBegin input) {
>>>>>>>>>>>>>>>>>       return input.apply("Create", 
>>>>>>>>>>>>>>>>> Create.of(this)).apply("ReadAll", readAll());
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I might be missing something for the Unbounded SDF case which 
>>>>>>>>>>>>>>>>> is the only case
>>>>>>>>>>>>>>>>> we have not explored so far. I think one easy way to see the 
>>>>>>>>>>>>>>>>> limitations would
>>>>>>>>>>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to 
>>>>>>>>>>>>>>>>> map
>>>>>>>>>>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and 
>>>>>>>>>>>>>>>>> the Read logic on
>>>>>>>>>>>>>>>>> the ReadAll with the SDF to see which constraints we hit, the 
>>>>>>>>>>>>>>>>> polymorphic ones
>>>>>>>>>>>>>>>>> will be there for sure, maybe others will appear (not sure). 
>>>>>>>>>>>>>>>>> However it would be
>>>>>>>>>>>>>>>>> interesting to see if we have a real gain in the maintenance 
>>>>>>>>>>>>>>>>> points, but well
>>>>>>>>>>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so 
>>>>>>>>>>>>>>>>> probably the generic
>>>>>>>>>>>>>>>>> implementation could be relatively complex.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for 
>>>>>>>>>>>>>>>>> > cross language. The difference being that the cross 
>>>>>>>>>>>>>>>>> > language transform would take a well known definition and 
>>>>>>>>>>>>>>>>> > convert it to the Read transform. A normal user would have 
>>>>>>>>>>>>>>>>> > a pipeline that would look like:
>>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>>>> > PCollection<Output>
>>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>>>> > PCollection<Output>
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> 
>>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> PCollection<Read> -> 
>>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>
>>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> 
>>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> 
>>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>>>> > PCollection<Output>*
>>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) 
>>>>>>>>>>>>>>>>> > only exists since we haven't solved how to use schemas with 
>>>>>>>>>>>>>>>>> > language bound types in a cross language way. SchemaCoder 
>>>>>>>>>>>>>>>>> > isn't portable but RowCoder is which is why the conversion 
>>>>>>>>>>>>>>>>> > step exists. We could have a solution for this at some 
>>>>>>>>>>>>>>>>> > point in time.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? 
>>>>>>>>>>>>>>>>> > For example, the Kafka Read implementation allows you to 
>>>>>>>>>>>>>>>>> > set the key and value deserializers which are also used to 
>>>>>>>>>>>>>>>>> > dictate the output PCollection type. It also allows you to 
>>>>>>>>>>>>>>>>> > set how the watermark should be computed. Technically a 
>>>>>>>>>>>>>>>>> > user may want the watermark computation to be configurable 
>>>>>>>>>>>>>>>>> > per Read and they may also want an output type which is 
>>>>>>>>>>>>>>>>> > polymorphic (e.g. PCollection<Serializable>).
>>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object 
>>>>>>>>>>>>>>>>> > modelling concerns.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), 
>>>>>>>>>>>>>>>>> > was it discovered that some properties became runtime 
>>>>>>>>>>>>>>>>> > errors or were ignored if they were set? If no, then the 
>>>>>>>>>>>>>>>>> > code deduplication is likely worth it because we also get a 
>>>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an 
>>>>>>>>>>>>>>>>> > acceptable user experience?
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko 
>>>>>>>>>>>>>>>>> > <aromanenko....@gmail.com> wrote:
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a 
>>>>>>>>>>>>>>>>> >> general "PTransform<PCollection<Read>, 
>>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the amount of code 
>>>>>>>>>>>>>>>>> >> duplication and error-prone approach related to this. It 
>>>>>>>>>>>>>>>>> >> makes much sense since usually we have all needed 
>>>>>>>>>>>>>>>>> >> configuration set in Read objects and, as Ismaeil 
>>>>>>>>>>>>>>>>> >> mentioned, ReadAll will consist mostly of only 
>>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages.  So this case usually can be 
>>>>>>>>>>>>>>>>> >> unified by using PCollection<Read> as input.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as 
>>>>>>>>>>>>>>>>> >> cross-language transforms (as Luke described) which seems 
>>>>>>>>>>>>>>>>> >> only partly in common with previous pattern of ReadAll 
>>>>>>>>>>>>>>>>> >> using.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read 
>>>>>>>>>>>>>>>>> >> configuration for all needs but seems it’s not easy and 
>>>>>>>>>>>>>>>>> >> I’d be more in favour with Luke and Boyuan approach with 
>>>>>>>>>>>>>>>>> >> schema. Though, maybe ReadAll is not a very suitable name 
>>>>>>>>>>>>>>>>> >> in this case because it will can bring some confusions 
>>>>>>>>>>>>>>>>> >> related to previous pattern of ReadAll uses.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang 
>>>>>>>>>>>>>>>>> >> <boyu...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and 
>>>>>>>>>>>>>>>>> >> (4): use the data type that is schema-aware as the input 
>>>>>>>>>>>>>>>>> >> of ReadAll.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang 
>>>>>>>>>>>>>>>>> >> <boyu...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type 
>>>>>>>>>>>>>>>>> >>> that is schema-aware as the input of ReadAll.
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with 
>>>>>>>>>>>>>>>>> >>> SDF-like IO. But only having  (3) is not enough to solve 
>>>>>>>>>>>>>>>>> >>> the problem of using ReadAll in x-lang case.
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of 
>>>>>>>>>>>>>>>>> >>> ReadAll should be able to cross language boundaries and 
>>>>>>>>>>>>>>>>> >>> have compatibilities of updating/downgrading. After 
>>>>>>>>>>>>>>>>> >>> investigating some possibilities(pure java pojo with 
>>>>>>>>>>>>>>>>> >>> custom coder, protobuf, row/schema) in Kafka usage, we 
>>>>>>>>>>>>>>>>> >>> find that row/schema fits our needs most. Here comes (4). 
>>>>>>>>>>>>>>>>> >>> I believe that using Read as input of ReadAll makes sense 
>>>>>>>>>>>>>>>>> >>> in some cases, but I also think not all IOs have the same 
>>>>>>>>>>>>>>>>> >>> need. I would treat Read as a special type as long as the 
>>>>>>>>>>>>>>>>> >>> Read is schema-aware.
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
>>>>>>>>>>>>>>>>> >>> <chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options 
>>>>>>>>>>>>>>>>> >>>> discussed so far when it comes to defining source 
>>>>>>>>>>>>>>>>> >>>> descriptors for ReadAll type transforms
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input 
>>>>>>>>>>>>>>>>> >>>> PCollection
>>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data 
>>>>>>>>>>>>>>>>> >>>> element of the input PCollection
>>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read 
>>>>>>>>>>>>>>>>> >>>> transform which essentially will convert it to a ReadAll 
>>>>>>>>>>>>>>>>> >>>> (what Eugene mentioned)
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of 
>>>>>>>>>>>>>>>>> >>>> source descriptions such as files.
>>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make 
>>>>>>>>>>>>>>>>> >>>> it hard to use the ReadAll transform as a cross-language 
>>>>>>>>>>>>>>>>> >>>> transform and will break the separation of construction 
>>>>>>>>>>>>>>>>> >>>> time and runtime constructs
>>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but 
>>>>>>>>>>>>>>>>> >>>> will make the transform easier to be used as a 
>>>>>>>>>>>>>>>>> >>>> cross-language transform without additional modifications
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms 
>>>>>>>>>>>>>>>>> >>>> that are more efficient. So we might be able to just 
>>>>>>>>>>>>>>>>> >>>> define all sources in that format and make Read 
>>>>>>>>>>>>>>>>> >>>> transforms just an easy to use composite built on top of 
>>>>>>>>>>>>>>>>> >>>> that (by adding a preceding Create transform).
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>>>>>>>> >>>> Cham
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik 
>>>>>>>>>>>>>>>>> >>>> <lc...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable 
>>>>>>>>>>>>>>>>> >>>>> since anonymous DoFns typically capture the enclosing 
>>>>>>>>>>>>>>>>> >>>>> PTransform.
>>>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>>>>>>>>>>>>>>>>> >>>>> <chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a 
>>>>>>>>>>>>>>>>> >>>>>> transform, at least here: 
>>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time 
>>>>>>>>>>>>>>>>> >>>>>> transforms from execution time data objects that we 
>>>>>>>>>>>>>>>>> >>>>>> store in PCollections as Luke mentioned. Also, we 
>>>>>>>>>>>>>>>>> >>>>>> don't guarantee that PTransform is serializable so 
>>>>>>>>>>>>>>>>> >>>>>> users have the additional complexity of providing a 
>>>>>>>>>>>>>>>>> >>>>>> corder whenever a PTransform is used as a data object.
>>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects 
>>>>>>>>>>>>>>>>> >>>>>> that are convertible to Beam Rows allow us to make 
>>>>>>>>>>>>>>>>> >>>>>> these transforms available to other SDKs through the 
>>>>>>>>>>>>>>>>> >>>>>> cross-language transforms. Using transforms or complex 
>>>>>>>>>>>>>>>>> >>>>>> sources as data objects will probably make this 
>>>>>>>>>>>>>>>>> >>>>>> difficult.
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
>>>>>>>>>>>>>>>>> >>>>>> <boyu...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the 
>>>>>>>>>>>>>>>>> >>>>>>> IO with SDF implementation despite the type of input, 
>>>>>>>>>>>>>>>>> >>>>>>> where Read refers to UnboundedSource.  One major 
>>>>>>>>>>>>>>>>> >>>>>>> pushback of using KafkaIO.Read as source description 
>>>>>>>>>>>>>>>>> >>>>>>> is that not all configurations of KafkaIO.Read are 
>>>>>>>>>>>>>>>>> >>>>>>> meaningful to populate during execution time. Also 
>>>>>>>>>>>>>>>>> >>>>>>> when thinking about x-lang useage, making source 
>>>>>>>>>>>>>>>>> >>>>>>> description across language boundaries is also 
>>>>>>>>>>>>>>>>> >>>>>>> necessary.  As Luke mentioned, it's quite easy to 
>>>>>>>>>>>>>>>>> >>>>>>> infer a Schema from an AutoValue object: 
>>>>>>>>>>>>>>>>> >>>>>>> KafkaSourceDescription.java. Then the coder of this 
>>>>>>>>>>>>>>>>> >>>>>>> schema-aware object will be a SchemaCoder. When 
>>>>>>>>>>>>>>>>> >>>>>>> crossing language boundaries, it's also easy to 
>>>>>>>>>>>>>>>>> >>>>>>> convert a Row into the source description: 
>>>>>>>>>>>>>>>>> >>>>>>> Convert.fromRows.
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik 
>>>>>>>>>>>>>>>>> >>>>>>> <lc...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll 
>>>>>>>>>>>>>>>>> >>>>>>>> transform takes a 
>>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This 
>>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that contains the 
>>>>>>>>>>>>>>>>> >>>>>>>> configurable parameters for reading from Kafka. This 
>>>>>>>>>>>>>>>>> >>>>>>>> is different from the pattern that Ismael listed 
>>>>>>>>>>>>>>>>> >>>>>>>> because they take PCollection<Read> as input and the 
>>>>>>>>>>>>>>>>> >>>>>>>> Read is the same as the Read PTransform class used 
>>>>>>>>>>>>>>>>> >>>>>>>> for the non read all case.
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication 
>>>>>>>>>>>>>>>>> >>>>>>>> since parameters used to configure the transform 
>>>>>>>>>>>>>>>>> >>>>>>>> have to be copied over to the source descriptor but 
>>>>>>>>>>>>>>>>> >>>>>>>> decouples how a transform is specified from the 
>>>>>>>>>>>>>>>>> >>>>>>>> object that describes what needs to be done. I 
>>>>>>>>>>>>>>>>> >>>>>>>> believe Ismael's point is that we wouldn't need such 
>>>>>>>>>>>>>>>>> >>>>>>>> a decoupling.
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I 
>>>>>>>>>>>>>>>>> >>>>>>>> believe is a non-issue is that the Beam Java SDK has 
>>>>>>>>>>>>>>>>> >>>>>>>> the most IO connectors and we would want to use the 
>>>>>>>>>>>>>>>>> >>>>>>>> IO implementations within Beam Go and Beam Python. 
>>>>>>>>>>>>>>>>> >>>>>>>> This brings in its own set of issues related to 
>>>>>>>>>>>>>>>>> >>>>>>>> versioning and compatibility for the wire format and 
>>>>>>>>>>>>>>>>> >>>>>>>> how one parameterizes such transforms. The wire 
>>>>>>>>>>>>>>>>> >>>>>>>> format issue can be solved with either approach by 
>>>>>>>>>>>>>>>>> >>>>>>>> making sure that the cross language expansion always 
>>>>>>>>>>>>>>>>> >>>>>>>> takes the well known format (whatever it may be) and 
>>>>>>>>>>>>>>>>> >>>>>>>> converts it into Read/KafkaSourceDescriptor/... 
>>>>>>>>>>>>>>>>> >>>>>>>> object that is then passed to the ReadAll transform. 
>>>>>>>>>>>>>>>>> >>>>>>>> Boyuan has been looking to make the 
>>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it can be 
>>>>>>>>>>>>>>>>> >>>>>>>> represented as a row and this can be done easily 
>>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue integration (I don't believe 
>>>>>>>>>>>>>>>>> >>>>>>>> there is anything preventing someone from writing a 
>>>>>>>>>>>>>>>>> >>>>>>>> schema row -> Read -> row adapter or also using the 
>>>>>>>>>>>>>>>>> >>>>>>>> AutoValue configuration if the transform is also an 
>>>>>>>>>>>>>>>>> >>>>>>>> AutoValue).
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and 
>>>>>>>>>>>>>>>>> >>>>>>>> separation of concerns provided by using a different 
>>>>>>>>>>>>>>>>> >>>>>>>> object to represent the contents of the PCollection 
>>>>>>>>>>>>>>>>> >>>>>>>> from the pipeline construction time PTransform.
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>>>>>>>>>>>>>>>>> >>>>>>>> <kirpic...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an 
>>>>>>>>>>>>>>>>> >>>>>>>>> approach similar (or dual) to FileIO.write(), where 
>>>>>>>>>>>>>>>>> >>>>>>>>> we in a sense also have to configure a dynamic 
>>>>>>>>>>>>>>>>> >>>>>>>>> number different IO transforms of the same type 
>>>>>>>>>>>>>>>>> >>>>>>>>> (file writes)?
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects 
>>>>>>>>>>>>>>>>> >>>>>>>>> of many file writes:
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, 
>>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // 
>>>>>>>>>>>>>>>>> >>>>>>>>> Convert the data to be written to CSVSink
>>>>>>>>>>>>>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type + 
>>>>>>>>>>>>>>>>> >>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from 
>>>>>>>>>>>>>>>>> >>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, 
>>>>>>>>>>>>>>>>> >>>>>>>>> Moo>readAll()
>>>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this 
>>>>>>>>>>>>>>>>> >>>>>>>>> bar...)
>>>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for 
>>>>>>>>>>>>>>>>> >>>>>>>>> this bar...)
>>>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía 
>>>>>>>>>>>>>>>>> >>>>>>>>> <ieme...@gmail.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires 
>>>>>>>>>>>>>>>>> >>>>>>>>>> context)
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn 
>>>>>>>>>>>>>>>>> >>>>>>>>>> based ones. One pattern
>>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The 
>>>>>>>>>>>>>>>>> >>>>>>>>>> idea is to have a different
>>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of
>>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> etc, for example:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing 
>>>>>>>>>>>>>>>>> >>>>>>>>>> multiple queries in the same
>>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or 
>>>>>>>>>>>>>>>>> >>>>>>>>>> querying from multiple tables at the
>>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll 
>>>>>>>>>>>>>>>>> >>>>>>>>>> transforms the parameters for
>>>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of 
>>>>>>>>>>>>>>>>> >>>>>>>>>> duplicated with methods and
>>>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into 
>>>>>>>>>>>>>>>>> >>>>>>>>>> the ReadAll transforms.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to 
>>>>>>>>>>>>>>>>> >>>>>>>>>> expand the input parameters of the
>>>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something that 
>>>>>>>>>>>>>>>>> >>>>>>>>>> resembles the full `Read`
>>>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read 
>>>>>>>>>>>>>>>>> >>>>>>>>>> from multiple tables or
>>>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this 
>>>>>>>>>>>>>>>>> >>>>>>>>>> was not in the intermediate
>>>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra 
>>>>>>>>>>>>>>>>> >>>>>>>>>> methods (duplicating more code)
>>>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full 
>>>>>>>>>>>>>>>>> >>>>>>>>>> spec.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method 
>>>>>>>>>>>>>>>>> >>>>>>>>>> we end up adding them
>>>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so 
>>>>>>>>>>>>>>>>> >>>>>>>>>> they are taken into account.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to 
>>>>>>>>>>>>>>>>> >>>>>>>>>> test a new approach that is
>>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code 
>>>>>>>>>>>>>>>>> >>>>>>>>>> became:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of 
>>>>>>>>>>>>>>>>> >>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read 
>>>>>>>>>>>>>>>>> >>>>>>>>>> parameters. But of course there are
>>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal 
>>>>>>>>>>>>>>>>> >>>>>>>>>> Reads for example
>>>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or Restriction 
>>>>>>>>>>>>>>>>> >>>>>>>>>> information (in the SDF
>>>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of 
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll produces a simple
>>>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable 
>>>>>>>>>>>>>>>>> >>>>>>>>>> between IOs (e.g. the    non-SDF
>>>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument> 
>>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", 
>>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>>>>>>>> >>>>>>>>>>     }
>>>>>>>>>>>>>>>>> >>>>>>>>>>   }
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results 
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll you must have the
>>>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require 
>>>>>>>>>>>>>>>>> >>>>>>>>>> consistent types from the data
>>>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add 
>>>>>>>>>>>>>>>>> >>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow 
>>>>>>>>>>>>>>>>> >>>>>>>>>> this ReadAll pattern. RedisIO
>>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So 
>>>>>>>>>>>>>>>>> >>>>>>>>>> I wanted to bring this subject
>>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if 
>>>>>>>>>>>>>>>>> >>>>>>>>>> you see any sort of issues that
>>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to 
>>>>>>>>>>>>>>>>> >>>>>>>>>> start using consistently the
>>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read 
>>>>>>>>>>>>>>>>> >>>>>>>>>> and the readAll() method for new
>>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the 
>>>>>>>>>>>>>>>>> >>>>>>>>>> only remaining inconsistent
>>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart 
>>>>>>>>>>>>>>>>> >>>>>>>>>> of this we should be ok).
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO 
>>>>>>>>>>>>>>>>> >>>>>>>>>> based on SDF is doing something
>>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called 
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll and maybe it is worth to be
>>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > I had mentioned that approach 1 and approach 2 work for 
>>>>>>>>>>>>>>>>> > cross language. The difference being that the cross 
>>>>>>>>>>>>>>>>> > language transform would take a well known definition and 
>>>>>>>>>>>>>>>>> > convert it to the Read transform. A normal user would have 
>>>>>>>>>>>>>>>>> > a pipeline that would look like:
>>>>>>>>>>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>>>> > PCollection<Output>
>>>>>>>>>>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>>>> > PCollection<Output>
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > And in the cross language case this would look like:
>>>>>>>>>>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> 
>>>>>>>>>>>>>>>>> > PTransform(Convert Row to Read) -> PCollection<Read> -> 
>>>>>>>>>>>>>>>>> > PTransform(ReadAll) -> PCollection<Output>
>>>>>>>>>>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> 
>>>>>>>>>>>>>>>>> > PTransform(Convert Row to SourceDescriptor) -> 
>>>>>>>>>>>>>>>>> > PCollection<SourceDescriptor> -> PTransform(ReadAll) -> 
>>>>>>>>>>>>>>>>> > PCollection<Output>*
>>>>>>>>>>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) 
>>>>>>>>>>>>>>>>> > only exists since we haven't solved how to use schemas with 
>>>>>>>>>>>>>>>>> > language bound types in a cross language way. SchemaCoder 
>>>>>>>>>>>>>>>>> > isn't portable but RowCoder is which is why the conversion 
>>>>>>>>>>>>>>>>> > step exists. We could have a solution for this at some 
>>>>>>>>>>>>>>>>> > point in time.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > My concern with using Read was around:
>>>>>>>>>>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? 
>>>>>>>>>>>>>>>>> > For example, the Kafka Read implementation allows you to 
>>>>>>>>>>>>>>>>> > set the key and value deserializers which are also used to 
>>>>>>>>>>>>>>>>> > dictate the output PCollection type. It also allows you to 
>>>>>>>>>>>>>>>>> > set how the watermark should be computed. Technically a 
>>>>>>>>>>>>>>>>> > user may want the watermark computation to be configurable 
>>>>>>>>>>>>>>>>> > per Read and they may also want an output type which is 
>>>>>>>>>>>>>>>>> > polymorphic (e.g. PCollection<Serializable>).
>>>>>>>>>>>>>>>>> > b) Read extends PTransform which brings its own object 
>>>>>>>>>>>>>>>>> > modelling concerns.
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), 
>>>>>>>>>>>>>>>>> > was it discovered that some properties became runtime 
>>>>>>>>>>>>>>>>> > errors or were ignored if they were set? If no, then the 
>>>>>>>>>>>>>>>>> > code deduplication is likely worth it because we also get a 
>>>>>>>>>>>>>>>>> > lot of javadoc deduplication, but if yes is this an 
>>>>>>>>>>>>>>>>> > acceptable user experience?
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko 
>>>>>>>>>>>>>>>>> > <aromanenko....@gmail.com> wrote:
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a 
>>>>>>>>>>>>>>>>> >> general "PTransform<PCollection<Read>, 
>>>>>>>>>>>>>>>>> >> PCollection<OutputType>>” was to reduce the amount of code 
>>>>>>>>>>>>>>>>> >> duplication and error-prone approach related to this. It 
>>>>>>>>>>>>>>>>> >> makes much sense since usually we have all needed 
>>>>>>>>>>>>>>>>> >> configuration set in Read objects and, as Ismaeil 
>>>>>>>>>>>>>>>>> >> mentioned, ReadAll will consist mostly of only 
>>>>>>>>>>>>>>>>> >> Split-Shuffle-Read stages.  So this case usually can be 
>>>>>>>>>>>>>>>>> >> unified by using PCollection<Read> as input.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> On the other hand, we have another need to use Java IOs as 
>>>>>>>>>>>>>>>>> >> cross-language transforms (as Luke described) which seems 
>>>>>>>>>>>>>>>>> >> only partly in common with previous pattern of ReadAll 
>>>>>>>>>>>>>>>>> >> using.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> I’d be more in favour to have only one concept of read 
>>>>>>>>>>>>>>>>> >> configuration for all needs but seems it’s not easy and 
>>>>>>>>>>>>>>>>> >> I’d be more in favour with Luke and Boyuan approach with 
>>>>>>>>>>>>>>>>> >> schema. Though, maybe ReadAll is not a very suitable name 
>>>>>>>>>>>>>>>>> >> in this case because it will can bring some confusions 
>>>>>>>>>>>>>>>>> >> related to previous pattern of ReadAll uses.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang 
>>>>>>>>>>>>>>>>> >> <boyu...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and 
>>>>>>>>>>>>>>>>> >> (4): use the data type that is schema-aware as the input 
>>>>>>>>>>>>>>>>> >> of ReadAll.
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang 
>>>>>>>>>>>>>>>>> >> <boyu...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> I think we can go with (2) and (4): use the data type 
>>>>>>>>>>>>>>>>> >>> that is schema-aware as the input of ReadAll.
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> Converting Read into ReadAll helps us to stick with 
>>>>>>>>>>>>>>>>> >>> SDF-like IO. But only having  (3) is not enough to solve 
>>>>>>>>>>>>>>>>> >>> the problem of using ReadAll in x-lang case.
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> The key point of ReadAll is that the input type of 
>>>>>>>>>>>>>>>>> >>> ReadAll should be able to cross language boundaries and 
>>>>>>>>>>>>>>>>> >>> have compatibilities of updating/downgrading. After 
>>>>>>>>>>>>>>>>> >>> investigating some possibilities(pure java pojo with 
>>>>>>>>>>>>>>>>> >>> custom coder, protobuf, row/schema) in Kafka usage, we 
>>>>>>>>>>>>>>>>> >>> find that row/schema fits our needs most. Here comes (4). 
>>>>>>>>>>>>>>>>> >>> I believe that using Read as input of ReadAll makes sense 
>>>>>>>>>>>>>>>>> >>> in some cases, but I also think not all IOs have the same 
>>>>>>>>>>>>>>>>> >>> need. I would treat Read as a special type as long as the 
>>>>>>>>>>>>>>>>> >>> Read is schema-aware.
>>>>>>>>>>>>>>>>> >>>
>>>>>>>>>>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
>>>>>>>>>>>>>>>>> >>> <chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> I see. So it seems like there are three options 
>>>>>>>>>>>>>>>>> >>>> discussed so far when it comes to defining source 
>>>>>>>>>>>>>>>>> >>>> descriptors for ReadAll type transforms
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input 
>>>>>>>>>>>>>>>>> >>>> PCollection
>>>>>>>>>>>>>>>>> >>>> (2) Use a POJO that describes the source as the data 
>>>>>>>>>>>>>>>>> >>>> element of the input PCollection
>>>>>>>>>>>>>>>>> >>>> (3) Provide a converter as a function to the Read 
>>>>>>>>>>>>>>>>> >>>> transform which essentially will convert it to a ReadAll 
>>>>>>>>>>>>>>>>> >>>> (what Eugene mentioned)
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> I feel like (3) is more suitable for a related set of 
>>>>>>>>>>>>>>>>> >>>> source descriptions such as files.
>>>>>>>>>>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make 
>>>>>>>>>>>>>>>>> >>>> it hard to use the ReadAll transform as a cross-language 
>>>>>>>>>>>>>>>>> >>>> transform and will break the separation of construction 
>>>>>>>>>>>>>>>>> >>>> time and runtime constructs
>>>>>>>>>>>>>>>>> >>>> (2) could result to less code reuse if not careful but 
>>>>>>>>>>>>>>>>> >>>> will make the transform easier to be used as a 
>>>>>>>>>>>>>>>>> >>>> cross-language transform without additional modifications
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms 
>>>>>>>>>>>>>>>>> >>>> that are more efficient. So we might be able to just 
>>>>>>>>>>>>>>>>> >>>> define all sources in that format and make Read 
>>>>>>>>>>>>>>>>> >>>> transforms just an easy to use composite built on top of 
>>>>>>>>>>>>>>>>> >>>> that (by adding a preceding Create transform).
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> Thanks,
>>>>>>>>>>>>>>>>> >>>> Cham
>>>>>>>>>>>>>>>>> >>>>
>>>>>>>>>>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik 
>>>>>>>>>>>>>>>>> >>>> <lc...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>>>> >>>>> I believe we do require PTransforms to be serializable 
>>>>>>>>>>>>>>>>> >>>>> since anonymous DoFns typically capture the enclosing 
>>>>>>>>>>>>>>>>> >>>>> PTransform.
>>>>>>>>>>>>>>>>> >>>>>
>>>>>>>>>>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>>>>>>>>>>>>>>>>> >>>>> <chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a 
>>>>>>>>>>>>>>>>> >>>>>> transform, at least here: 
>>>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> I'm in favour of separating construction time 
>>>>>>>>>>>>>>>>> >>>>>> transforms from execution time data objects that we 
>>>>>>>>>>>>>>>>> >>>>>> store in PCollections as Luke mentioned. Also, we 
>>>>>>>>>>>>>>>>> >>>>>> don't guarantee that PTransform is serializable so 
>>>>>>>>>>>>>>>>> >>>>>> users have the additional complexity of providing a 
>>>>>>>>>>>>>>>>> >>>>>> corder whenever a PTransform is used as a data object.
>>>>>>>>>>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects 
>>>>>>>>>>>>>>>>> >>>>>> that are convertible to Beam Rows allow us to make 
>>>>>>>>>>>>>>>>> >>>>>> these transforms available to other SDKs through the 
>>>>>>>>>>>>>>>>> >>>>>> cross-language transforms. Using transforms or complex 
>>>>>>>>>>>>>>>>> >>>>>> sources as data objects will probably make this 
>>>>>>>>>>>>>>>>> >>>>>> difficult.
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> Thanks,
>>>>>>>>>>>>>>>>> >>>>>> Cham
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>>
>>>>>>>>>>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
>>>>>>>>>>>>>>>>> >>>>>> <boyu...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the 
>>>>>>>>>>>>>>>>> >>>>>>> IO with SDF implementation despite the type of input, 
>>>>>>>>>>>>>>>>> >>>>>>> where Read refers to UnboundedSource.  One major 
>>>>>>>>>>>>>>>>> >>>>>>> pushback of using KafkaIO.Read as source description 
>>>>>>>>>>>>>>>>> >>>>>>> is that not all configurations of KafkaIO.Read are 
>>>>>>>>>>>>>>>>> >>>>>>> meaningful to populate during execution time. Also 
>>>>>>>>>>>>>>>>> >>>>>>> when thinking about x-lang useage, making source 
>>>>>>>>>>>>>>>>> >>>>>>> description across language boundaries is also 
>>>>>>>>>>>>>>>>> >>>>>>> necessary.  As Luke mentioned, it's quite easy to 
>>>>>>>>>>>>>>>>> >>>>>>> infer a Schema from an AutoValue object: 
>>>>>>>>>>>>>>>>> >>>>>>> KafkaSourceDescription.java. Then the coder of this 
>>>>>>>>>>>>>>>>> >>>>>>> schema-aware object will be a SchemaCoder. When 
>>>>>>>>>>>>>>>>> >>>>>>> crossing language boundaries, it's also easy to 
>>>>>>>>>>>>>>>>> >>>>>>> convert a Row into the source description: 
>>>>>>>>>>>>>>>>> >>>>>>> Convert.fromRows.
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik 
>>>>>>>>>>>>>>>>> >>>>>>> <lc...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll 
>>>>>>>>>>>>>>>>> >>>>>>>> transform takes a 
>>>>>>>>>>>>>>>>> >>>>>>>> PCollection<KafkaSourceDescriptor>. This 
>>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor is a POJO that contains the 
>>>>>>>>>>>>>>>>> >>>>>>>> configurable parameters for reading from Kafka. This 
>>>>>>>>>>>>>>>>> >>>>>>>> is different from the pattern that Ismael listed 
>>>>>>>>>>>>>>>>> >>>>>>>> because they take PCollection<Read> as input and the 
>>>>>>>>>>>>>>>>> >>>>>>>> Read is the same as the Read PTransform class used 
>>>>>>>>>>>>>>>>> >>>>>>>> for the non read all case.
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication 
>>>>>>>>>>>>>>>>> >>>>>>>> since parameters used to configure the transform 
>>>>>>>>>>>>>>>>> >>>>>>>> have to be copied over to the source descriptor but 
>>>>>>>>>>>>>>>>> >>>>>>>> decouples how a transform is specified from the 
>>>>>>>>>>>>>>>>> >>>>>>>> object that describes what needs to be done. I 
>>>>>>>>>>>>>>>>> >>>>>>>> believe Ismael's point is that we wouldn't need such 
>>>>>>>>>>>>>>>>> >>>>>>>> a decoupling.
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I 
>>>>>>>>>>>>>>>>> >>>>>>>> believe is a non-issue is that the Beam Java SDK has 
>>>>>>>>>>>>>>>>> >>>>>>>> the most IO connectors and we would want to use the 
>>>>>>>>>>>>>>>>> >>>>>>>> IO implementations within Beam Go and Beam Python. 
>>>>>>>>>>>>>>>>> >>>>>>>> This brings in its own set of issues related to 
>>>>>>>>>>>>>>>>> >>>>>>>> versioning and compatibility for the wire format and 
>>>>>>>>>>>>>>>>> >>>>>>>> how one parameterizes such transforms. The wire 
>>>>>>>>>>>>>>>>> >>>>>>>> format issue can be solved with either approach by 
>>>>>>>>>>>>>>>>> >>>>>>>> making sure that the cross language expansion always 
>>>>>>>>>>>>>>>>> >>>>>>>> takes the well known format (whatever it may be) and 
>>>>>>>>>>>>>>>>> >>>>>>>> converts it into Read/KafkaSourceDescriptor/... 
>>>>>>>>>>>>>>>>> >>>>>>>> object that is then passed to the ReadAll transform. 
>>>>>>>>>>>>>>>>> >>>>>>>> Boyuan has been looking to make the 
>>>>>>>>>>>>>>>>> >>>>>>>> KafkaSourceDescriptor have a schema so it can be 
>>>>>>>>>>>>>>>>> >>>>>>>> represented as a row and this can be done easily 
>>>>>>>>>>>>>>>>> >>>>>>>> using the AutoValue integration (I don't believe 
>>>>>>>>>>>>>>>>> >>>>>>>> there is anything preventing someone from writing a 
>>>>>>>>>>>>>>>>> >>>>>>>> schema row -> Read -> row adapter or also using the 
>>>>>>>>>>>>>>>>> >>>>>>>> AutoValue configuration if the transform is also an 
>>>>>>>>>>>>>>>>> >>>>>>>> AutoValue).
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> I would be more for the code duplication and 
>>>>>>>>>>>>>>>>> >>>>>>>> separation of concerns provided by using a different 
>>>>>>>>>>>>>>>>> >>>>>>>> object to represent the contents of the PCollection 
>>>>>>>>>>>>>>>>> >>>>>>>> from the pipeline construction time PTransform.
>>>>>>>>>>>>>>>>> >>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
>>>>>>>>>>>>>>>>> >>>>>>>> <kirpic...@google.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an 
>>>>>>>>>>>>>>>>> >>>>>>>>> approach similar (or dual) to FileIO.write(), where 
>>>>>>>>>>>>>>>>> >>>>>>>>> we in a sense also have to configure a dynamic 
>>>>>>>>>>>>>>>>> >>>>>>>>> number different IO transforms of the same type 
>>>>>>>>>>>>>>>>> >>>>>>>>> (file writes)?
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects 
>>>>>>>>>>>>>>>>> >>>>>>>>> of many file writes:
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, 
>>>>>>>>>>>>>>>>> >>>>>>>>> Transaction>writeDynamic()
>>>>>>>>>>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>>>>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // 
>>>>>>>>>>>>>>>>> >>>>>>>>> Convert the data to be written to CSVSink
>>>>>>>>>>>>>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>>>>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>>>>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type + 
>>>>>>>>>>>>>>>>> >>>>>>>>> "-transactions", ".csv"));
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from 
>>>>>>>>>>>>>>>>> >>>>>>>>> which all the read parameters can be inferred
>>>>>>>>>>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, 
>>>>>>>>>>>>>>>>> >>>>>>>>> Moo>readAll()
>>>>>>>>>>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this 
>>>>>>>>>>>>>>>>> >>>>>>>>> bar...)
>>>>>>>>>>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>>>>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for 
>>>>>>>>>>>>>>>>> >>>>>>>>> this bar...)
>>>>>>>>>>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía 
>>>>>>>>>>>>>>>>> >>>>>>>>> <ieme...@gmail.com> wrote:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires 
>>>>>>>>>>>>>>>>> >>>>>>>>>> context)
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn 
>>>>>>>>>>>>>>>>> >>>>>>>>>> based ones. One pattern
>>>>>>>>>>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The 
>>>>>>>>>>>>>>>>> >>>>>>>>>> idea is to have a different
>>>>>>>>>>>>>>>>> >>>>>>>>>> kind of composable reads where we take a 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection of different sorts of
>>>>>>>>>>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> etc, for example:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<OutputT>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<HBaseQuery>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing 
>>>>>>>>>>>>>>>>> >>>>>>>>>> multiple queries in the same
>>>>>>>>>>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or 
>>>>>>>>>>>>>>>>> >>>>>>>>>> querying from multiple tables at the
>>>>>>>>>>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll 
>>>>>>>>>>>>>>>>> >>>>>>>>>> transforms the parameters for
>>>>>>>>>>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of 
>>>>>>>>>>>>>>>>> >>>>>>>>>> duplicated with methods and
>>>>>>>>>>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into 
>>>>>>>>>>>>>>>>> >>>>>>>>>> the ReadAll transforms.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> - When you require new parameters you have to 
>>>>>>>>>>>>>>>>> >>>>>>>>>> expand the input parameters of the
>>>>>>>>>>>>>>>>> >>>>>>>>>>   intermediary specification into something that 
>>>>>>>>>>>>>>>>> >>>>>>>>>> resembles the full `Read`
>>>>>>>>>>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read 
>>>>>>>>>>>>>>>>> >>>>>>>>>> from multiple tables or
>>>>>>>>>>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this 
>>>>>>>>>>>>>>>>> >>>>>>>>>> was not in the intermediate
>>>>>>>>>>>>>>>>> >>>>>>>>>>   specification you end up adding those extra 
>>>>>>>>>>>>>>>>> >>>>>>>>>> methods (duplicating more code)
>>>>>>>>>>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full 
>>>>>>>>>>>>>>>>> >>>>>>>>>> spec.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method 
>>>>>>>>>>>>>>>>> >>>>>>>>>> we end up adding them
>>>>>>>>>>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so 
>>>>>>>>>>>>>>>>> >>>>>>>>>> they are taken into account.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to 
>>>>>>>>>>>>>>>>> >>>>>>>>>> test a new approach that is
>>>>>>>>>>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code 
>>>>>>>>>>>>>>>>> >>>>>>>>>> became:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<Result>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> With this approach users gain benefits of 
>>>>>>>>>>>>>>>>> >>>>>>>>>> improvements on parameters of normal
>>>>>>>>>>>>>>>>> >>>>>>>>>> Read because they count with the full Read 
>>>>>>>>>>>>>>>>> >>>>>>>>>> parameters. But of course there are
>>>>>>>>>>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal 
>>>>>>>>>>>>>>>>> >>>>>>>>>> Reads for example
>>>>>>>>>>>>>>>>> >>>>>>>>>>    partition boundaries information or Restriction 
>>>>>>>>>>>>>>>>> >>>>>>>>>> information (in the SDF
>>>>>>>>>>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of 
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll produces a simple
>>>>>>>>>>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable 
>>>>>>>>>>>>>>>>> >>>>>>>>>> between IOs (e.g. the    non-SDF
>>>>>>>>>>>>>>>>> >>>>>>>>>>    case):
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>   public static class ReadAll extends 
>>>>>>>>>>>>>>>>> >>>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>>>>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>>>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument> 
>>>>>>>>>>>>>>>>> >>>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>>>>>>>>>>> >>>>>>>>>>       return input
>>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", 
>>>>>>>>>>>>>>>>> >>>>>>>>>> Reshuffle.viaRandomKey())
>>>>>>>>>>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>>>>>>>>>> >>>>>>>>>>     }
>>>>>>>>>>>>>>>>> >>>>>>>>>>   }
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results 
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll you must have the
>>>>>>>>>>>>>>>>> >>>>>>>>>>    Coders used in its definition and require 
>>>>>>>>>>>>>>>>> >>>>>>>>>> consistent types from the data
>>>>>>>>>>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add 
>>>>>>>>>>>>>>>>> >>>>>>>>>> extra withCoder method(s) on
>>>>>>>>>>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow 
>>>>>>>>>>>>>>>>> >>>>>>>>>> this ReadAll pattern. RedisIO
>>>>>>>>>>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So 
>>>>>>>>>>>>>>>>> >>>>>>>>>> I wanted to bring this subject
>>>>>>>>>>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if 
>>>>>>>>>>>>>>>>> >>>>>>>>>> you see any sort of issues that
>>>>>>>>>>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to 
>>>>>>>>>>>>>>>>> >>>>>>>>>> start using consistently the
>>>>>>>>>>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read 
>>>>>>>>>>>>>>>>> >>>>>>>>>> and the readAll() method for new
>>>>>>>>>>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the 
>>>>>>>>>>>>>>>>> >>>>>>>>>> only remaining inconsistent
>>>>>>>>>>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart 
>>>>>>>>>>>>>>>>> >>>>>>>>>> of this we should be ok).
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO 
>>>>>>>>>>>>>>>>> >>>>>>>>>> based on SDF is doing something
>>>>>>>>>>>>>>>>> >>>>>>>>>> similar to the old pattern but being called 
>>>>>>>>>>>>>>>>> >>>>>>>>>> ReadAll and maybe it is worth to be
>>>>>>>>>>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>>>>>>>>>>> >>>>>>>>>>
>>>>>>>>>>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>> >>
>>>>>>>>>>>>>>>>>

Reply via email to