On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]> wrote:

> I would also like to suggest that transforms that implement ReadAll via
> Read should also provide methods like:
>
> // Uses the specified values if unspecified in the input element from the
> PCollection<Read>.
> withDefaults(Read read);
> // Uses the specified values regardless of what the input element from the
> PCollection<Read> specifies.
> withOverrides(Read read);
>
> and only adds methods that are required at construction time (e.g.
> coders). This way the majority of documentation sits on the Read transform.
>

+0 from me. Sounds like benefits outweigh the drawbacks here and some of
the drawbacks related to cross-language can be overcome through future
advancements.
Thanks for bringing this up Ismaël.

- Cham


>
> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> wrote:
>
>> Ismael, it is good to hear that using Read as the input didn't have a
>> bunch of parameters that were being skipped/ignored. Also, for the
>> polymorphism issue you have to rely on the user correctly telling you the
>> type in such a way where it is a common ancestor of all the runtime types
>> that will ever be used. This usually boils down to something like
>> Serializable or DynamicMessage such that the coder that is chosen works for
>> all the runtime types. Using multiple types is a valid use case and would
>> allow for a simpler graph with less flattens merging the output from
>> multiple sources.
>>
>> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which
>> uses schemas even if some of the parameters can't be represented in a
>> meaningful way beyond "bytes". This would be helpful for cross language as
>> well since every parameter would become available if a language could
>> support it (e.g. it could serialize a java function up front and keep it
>> saved as raw bytes within said language). Even if we figure out a better
>> way to do this in the future, we'll have to change the schema for the new
>> way anyway. This would mean that the external version of the transform
>> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from
>> Row to Read could validate that the parameters make sense (e.g. the bytes
>> are valid serialized functions). The addition of an
>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
>> this would enable having a bounded version that could be used for backfills
>> (this doesn't have to be done as part of any current ongoing PR).
>> Essentially any parameter that could be added for a single instance of a
>> Kafka element+restriction would also make sense to the KafkaIO.Read
>> transform since it too is a single instance. There are parameters that
>> would apply to the ReadAll that wouldn't apply to a read and these would be
>> global parameters across all element+restriction pairs such as config
>> overrides or default values.
>>
>> I am convinced that we should do as Ismael is suggesting and use
>> KafkaIO.Read as the type.
>>
>>
>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> Discussion regarding cross-language transforms is a slight tangent here.
>>> But I think, in general, it's great if we can use existing transforms (for
>>> example, IO connectors) as cross-language transforms without having to
>>> build more composites (irrespective of whether in ExternalTransformBuilders
>>> or a user pipelines) just to make them cross-language compatible. A future
>>> cross-language compatible SchemaCoder might help (assuming that works for
>>> Read transform) but I'm not sure we have a good idea when we'll get to that
>>> state.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <[email protected]> wrote:
>>>
>>>> For unbounded SDF in Kafka, we also consider the upgrading/downgrading
>>>> compatibility in the pipeline update scenario(For detailed discussion,
>>>> please refer to
>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>>> In order to obtain the compatibility, it requires the input of the read SDF
>>>> is schema-aware.
>>>>
>>>> Thus the major constraint of mapping KafkaSourceDescriptor to
>>>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware,
>>>> otherwise pipeline updates might fail unnecessarily. If looking into
>>>> KafkaIO.Read, not all necessary fields are compatible with schema, for
>>>> example, SerializedFunction.
>>>>
>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern
>>>> for SDF based IO. The Read can be a common pattern because the input is
>>>> always a PBegin. But for an SDF based IO, the input can be anything. By
>>>> using Read as input, we will still have the maintenance cost when SDF IO
>>>> supports a new field but Read doesn't consume it. For example, we are
>>>> discussing adding endOffset and endReadTime to KafkaSourceDescriptior,
>>>> which is not used in KafkaIO.Read.
>>>>
>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <[email protected]> wrote:
>>>>
>>>>> We forgot to mention (5) External.Config used in cross-lang, see
>>>>> KafkaIO
>>>>> ExternalTransformBuilder. This approach is the predecessor of (4) and
>>>>> probably a
>>>>> really good candidate to be replaced by the Row based Configuration
>>>>> Boyuan is
>>>>> envisioning (so good to be aware of this).
>>>>>
>>>>> Thanks for the clear explanation Luke you mention the real issue(s).
>>>>> All the
>>>>> approaches discussed so far in the end could be easily transformed to
>>>>> produce a
>>>>> PCollection<Read> and those Read Elements could be read by the generic
>>>>> ReadAll
>>>>> transform. Notice that this can be internal in some IOs e.g. KafkaIO
>>>>> if they
>>>>> decide not to expose it. I am not saying that we should force every IO
>>>>> to
>>>>> support ReadAll in its public API but if we do it is probably a good
>>>>> idea to be
>>>>> consistent with naming the transform that expects an input
>>>>> PCollection<Read> in
>>>>> the same way. Also notice that using it will save us of the
>>>>> maintenance issues
>>>>> discussed in my previous email.
>>>>>
>>>>> Back to the main concern: the consequences of expansion based on Read:
>>>>> So far I
>>>>> have not seen consequences for the Splitting part which maps really
>>>>> nice
>>>>> assuming the Partition info / Restriction is available as part of
>>>>> Read. So far
>>>>> there are not Serialization because Beam is already enforcing this.
>>>>> Notice that
>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least
>>>>> for the
>>>>> Bounded case (see the code in my previous email). For the other points:
>>>>>
>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>> example, the
>>>>> > Kafka Read implementation allows you to set the key and value
>>>>> deserializers
>>>>> > which are also used to dictate the output PCollection type. It also
>>>>> allows you
>>>>> > to set how the watermark should be computed. Technically a user may
>>>>> want the
>>>>> > watermark computation to be configurable per Read and they may also
>>>>> want an
>>>>> > output type which is polymorphic (e.g. Pcollection<Serializable>).
>>>>>
>>>>> Most of the times they do but for parametric types we cannot support
>>>>> different
>>>>> types in the outputs of the Read or at least I did not find how to do
>>>>> so (is
>>>>> there a way to use multiple output Coders on Beam?), we saw this in
>>>>> CassandraIO
>>>>> and we were discussing adding explicitly these Coders or Serializer
>>>>> specific methods to the ReadAll transform. This is less nice because
>>>>> it will
>>>>> imply some repeated methods, but it is still a compromise to gain the
>>>>> other
>>>>> advantages. I suppose the watermark case you mention is similar
>>>>> because you may
>>>>> want the watermark to behave differently in each Read and we probably
>>>>> don’t
>>>>> support this, so it corresponds to the polymorphic category.
>>>>>
>>>>> > b) Read extends PTransform which brings its own object modelling
>>>>> concerns.
>>>>>
>>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>>> discovered
>>>>> > that some properties became runtime errors or were ignored if they
>>>>> were set?
>>>>> > If no, then the code deduplication is likely worth it because we
>>>>> also get a
>>>>> > lot of javadoc deduplication, but if yes is this an acceptable user
>>>>> > experience?
>>>>>
>>>>> No, not so far. This is an interesting part, notice that the Read
>>>>> translation
>>>>> ends up delegating the read bits to the ReadFn part of ReadAll so the
>>>>> ReadFn is
>>>>> the real read and must be aware and use all the parameters.
>>>>>
>>>>>     @Override
>>>>>     public PCollection<SolrDocument> expand(PBegin input) {
>>>>>       return input.apply("Create", Create.of(this)).apply("ReadAll",
>>>>> readAll());
>>>>>     }
>>>>>
>>>>> I might be missing something for the Unbounded SDF case which is the
>>>>> only case
>>>>> we have not explored so far. I think one easy way to see the
>>>>> limitations would
>>>>> be in the ongoing KafkaIO SDF based implementation to try to map
>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the Read
>>>>> logic on
>>>>> the ReadAll with the SDF to see which constraints we hit, the
>>>>> polymorphic ones
>>>>> will be there for sure, maybe others will appear (not sure). However
>>>>> it would be
>>>>> interesting to see if we have a real gain in the maintenance points,
>>>>> but well
>>>>> let’s not forget also that KafkaIO has a LOT of knobs so probably the
>>>>> generic
>>>>> implementation could be relatively complex.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote:
>>>>> >
>>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>>> language. The difference being that the cross language transform would 
>>>>> take
>>>>> a well known definition and convert it to the Read transform. A normal 
>>>>> user
>>>>> would have a pipeline that would look like:
>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>> PCollection<Output>
>>>>> >
>>>>> > And in the cross language case this would look like:
>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
>>>>> -> PCollection<Output>*
>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists
>>>>> since we haven't solved how to use schemas with language bound types in a
>>>>> cross language way. SchemaCoder isn't portable but RowCoder is which is 
>>>>> why
>>>>> the conversion step exists. We could have a solution for this at some 
>>>>> point
>>>>> in time.
>>>>> >
>>>>> > My concern with using Read was around:
>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>> example, the Kafka Read implementation allows you to set the key and value
>>>>> deserializers which are also used to dictate the output PCollection type.
>>>>> It also allows you to set how the watermark should be computed. 
>>>>> Technically
>>>>> a user may want the watermark computation to be configurable per Read and
>>>>> they may also want an output type which is polymorphic (e.g.
>>>>> PCollection<Serializable>).
>>>>> > b) Read extends PTransform which brings its own object modelling
>>>>> concerns.
>>>>> >
>>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>>> discovered that some properties became runtime errors or were ignored if
>>>>> they were set? If no, then the code deduplication is likely worth it
>>>>> because we also get a lot of javadoc deduplication, but if yes is this an
>>>>> acceptable user experience?
>>>>> >
>>>>> >
>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>> [email protected]> wrote:
>>>>> >>
>>>>> >> I believe that the initial goal of unifying ReadAll as a general
>>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
>>>>> amount of code duplication and error-prone approach related to this. It
>>>>> makes much sense since usually we have all needed configuration set in 
>>>>> Read
>>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>>>>> Split-Shuffle-Read stages.  So this case usually can be unified by using
>>>>> PCollection<Read> as input.
>>>>> >>
>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>> cross-language transforms (as Luke described) which seems only partly in
>>>>> common with previous pattern of ReadAll using.
>>>>> >>
>>>>> >> I’d be more in favour to have only one concept of read
>>>>> configuration for all needs but seems it’s not easy and I’d be more in
>>>>> favour with Luke and Boyuan approach with schema. Though, maybe ReadAll is
>>>>> not a very suitable name in this case because it will can bring some
>>>>> confusions related to previous pattern of ReadAll uses.
>>>>> >>
>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote:
>>>>> >>
>>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use
>>>>> the data type that is schema-aware as the input of ReadAll.
>>>>> >>
>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Thanks for the summary, Cham!
>>>>> >>>
>>>>> >>> I think we can go with (2) and (4): use the data type that is
>>>>> schema-aware as the input of ReadAll.
>>>>> >>>
>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO.
>>>>> But only having  (3) is not enough to solve the problem of using ReadAll 
>>>>> in
>>>>> x-lang case.
>>>>> >>>
>>>>> >>> The key point of ReadAll is that the input type of ReadAll should
>>>>> be able to cross language boundaries and have compatibilities of
>>>>> updating/downgrading. After investigating some possibilities(pure java 
>>>>> pojo
>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>>>>> row/schema fits our needs most. Here comes (4). I believe that using Read
>>>>> as input of ReadAll makes sense in some cases, but I also think not all 
>>>>> IOs
>>>>> have the same need. I would treat Read as a special type as long as the
>>>>> Read is schema-aware.
>>>>> >>>
>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>> >>>>
>>>>> >>>> I see. So it seems like there are three options discussed so far
>>>>> when it comes to defining source descriptors for ReadAll type transforms
>>>>> >>>>
>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>> PCollection
>>>>> >>>> (2) Use a POJO that describes the source as the data element of
>>>>> the input PCollection
>>>>> >>>> (3) Provide a converter as a function to the Read transform which
>>>>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>>>> >>>>
>>>>> >>>> I feel like (3) is more suitable for a related set of source
>>>>> descriptions such as files.
>>>>> >>>> (1) will allow most code-reuse but seems like will make it hard
>>>>> to use the ReadAll transform as a cross-language transform and will break
>>>>> the separation of construction time and runtime constructs
>>>>> >>>> (2) could result to less code reuse if not careful but will make
>>>>> the transform easier to be used as a cross-language transform without
>>>>> additional modifications
>>>>> >>>>
>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are
>>>>> more efficient. So we might be able to just define all sources in that
>>>>> format and make Read transforms just an easy to use composite built on top
>>>>> of that (by adding a preceding Create transform).
>>>>> >>>>
>>>>> >>>> Thanks,
>>>>> >>>> Cham
>>>>> >>>>
>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]>
>>>>> wrote:
>>>>> >>>>>
>>>>> >>>>> I believe we do require PTransforms to be serializable since
>>>>> anonymous DoFns typically capture the enclosing PTransform.
>>>>> >>>>>
>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>> >>>>>>
>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
>>>>> least here:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>> >>>>>>
>>>>> >>>>>> I'm in favour of separating construction time transforms from
>>>>> execution time data objects that we store in PCollections as Luke
>>>>> mentioned. Also, we don't guarantee that PTransform is serializable so
>>>>> users have the additional complexity of providing a corder whenever a
>>>>> PTransform is used as a data object.
>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are
>>>>> convertible to Beam Rows allow us to make these transforms available to
>>>>> other SDKs through the cross-language transforms. Using transforms or
>>>>> complex sources as data objects will probably make this difficult.
>>>>> >>>>>>
>>>>> >>>>>> Thanks,
>>>>> >>>>>> Cham
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>> [email protected]> wrote:
>>>>> >>>>>>>
>>>>> >>>>>>> Hi Ismael,
>>>>> >>>>>>>
>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with
>>>>> SDF implementation despite the type of input, where Read refers to
>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>>> making source description across language boundaries is also necessary.  
>>>>> As
>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>> object:
>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>>>>> will be a SchemaCoder. When crossing language boundaries, it's also easy 
>>>>> to
>>>>> convert a Row into the source description: Convert.fromRows.
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]>
>>>>> wrote:
>>>>> >>>>>>>>
>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
>>>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is 
>>>>> a
>>>>> POJO that contains the configurable parameters for reading from Kafka. 
>>>>> This
>>>>> is different from the pattern that Ismael listed because they take
>>>>> PCollection<Read> as input and the Read is the same as the Read PTransform
>>>>> class used for the non read all case.
>>>>> >>>>>>>>
>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>>>>> parameters used to configure the transform have to be copied over to the
>>>>> source descriptor but decouples how a transform is specified from the
>>>>> object that describes what needs to be done. I believe Ismael's point is
>>>>> that we wouldn't need such a decoupling.
>>>>> >>>>>>>>
>>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>>>>> non-issue is that the Beam Java SDK has the most IO connectors and we 
>>>>> would
>>>>> want to use the IO implementations within Beam Go and Beam Python. This
>>>>> brings in its own set of issues related to versioning and compatibility 
>>>>> for
>>>>> the wire format and how one parameterizes such transforms. The wire format
>>>>> issue can be solved with either approach by making sure that the cross
>>>>> language expansion always takes the well known format (whatever it may be)
>>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>>>>> passed to the ReadAll transform. Boyuan has been looking to make the
>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>>>> this can be done easily using the AutoValue integration (I don't believe
>>>>> there is anything preventing someone from writing a schema row -> Read ->
>>>>> row adapter or also using the AutoValue configuration if the transform is
>>>>> also an AutoValue).
>>>>> >>>>>>>>
>>>>> >>>>>>>> I would be more for the code duplication and separation of
>>>>> concerns provided by using a different object to represent the contents of
>>>>> the PCollection from the pipeline construction time PTransform.
>>>>> >>>>>>>>
>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>> [email protected]> wrote:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Hi Ismael,
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach
>>>>> similar (or dual) to FileIO.write(), where we in a sense also have to
>>>>> configure a dynamic number different IO transforms of the same type (file
>>>>> writes)?
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of many
>>>>> file writes:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>> Transaction>writeDynamic()
>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the
>>>>> data to be written to CSVSink
>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>> "-transactions", ".csv"));
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all
>>>>> the read parameters can be inferred
>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this
>>>>> bar...)
>>>>> >>>>>>>>>   ...etc);
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>> [email protected]> wrote:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Hello,
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> (my excuses for the long email but this requires context)
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based
>>>>> ones. One pattern
>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is
>>>>> to have a different
>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>>>>> different sorts of
>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
>>>>> example:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> JdbcIO:
>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> RedisIO:
>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>> PCollection<KV<String, String>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> HBaseIO:
>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>> PCollection<Result>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
>>>>> queries in the same
>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
>>>>> multiple tables at the
>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
>>>>> parameters for
>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>> duplicated with methods and
>>>>> >>>>>>>>>>   error-prone code from the Read transforms into the
>>>>> ReadAll transforms.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> - When you require new parameters you have to expand the
>>>>> input parameters of the
>>>>> >>>>>>>>>>   intermediary specification into something that resembles
>>>>> the full `Read`
>>>>> >>>>>>>>>>   definition for example imagine you want to read from
>>>>> multiple tables or
>>>>> >>>>>>>>>>   servers as part of the same pipeline but this was not in
>>>>> the intermediate
>>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>>> (duplicating more code)
>>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> - If new parameters are added to the Read method we end up
>>>>> adding them
>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they are
>>>>> taken into account.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a new
>>>>> approach that is
>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> HBaseIO:
>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>> PCollection<Result>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> With this approach users gain benefits of improvements on
>>>>> parameters of normal
>>>>> >>>>>>>>>> Read because they count with the full Read parameters. But
>>>>> of course there are
>>>>> >>>>>>>>>> some minor caveats:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> 1. You need to push some information into normal Reads for
>>>>> example
>>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>>> information (in the SDF
>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
>>>>> produces a simple
>>>>> >>>>>>>>>>    pattern that ends up being almost reusable between IOs
>>>>> (e.g. the    non-SDF
>>>>> >>>>>>>>>>    case):
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>> PTransform<PCollection<Read>,
>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>> >>>>>>>>>>     @Override
>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>> expand(PCollection<Read> input) {
>>>>> >>>>>>>>>>       return input
>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>> >>>>>>>>>>     }
>>>>> >>>>>>>>>>   }
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll
>>>>> you must have the
>>>>> >>>>>>>>>>    Coders used in its definition and require consistent
>>>>> types from the data
>>>>> >>>>>>>>>>    sources, in practice this means we need to add extra
>>>>> withCoder method(s) on
>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this
>>>>> ReadAll pattern. RedisIO
>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted
>>>>> to bring this subject
>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see
>>>>> any sort of issues that
>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Also I would like to see if we have consensus to start
>>>>> using consistently the
>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>>>>> readAll() method for new
>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>>> remaining inconsistent
>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this
>>>>> we should be ok).
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on
>>>>> SDF is doing something
>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and
>>>>> maybe it is worth to be
>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Regards,
>>>>> >>>>>>>>>> Ismaël
>>>>> >>
>>>>> >>
>>>>>
>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote:
>>>>> >
>>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>>> language. The difference being that the cross language transform would 
>>>>> take
>>>>> a well known definition and convert it to the Read transform. A normal 
>>>>> user
>>>>> would have a pipeline that would look like:
>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>> PCollection<Output>
>>>>> >
>>>>> > And in the cross language case this would look like:
>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
>>>>> -> PCollection<Output>*
>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists
>>>>> since we haven't solved how to use schemas with language bound types in a
>>>>> cross language way. SchemaCoder isn't portable but RowCoder is which is 
>>>>> why
>>>>> the conversion step exists. We could have a solution for this at some 
>>>>> point
>>>>> in time.
>>>>> >
>>>>> > My concern with using Read was around:
>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>> example, the Kafka Read implementation allows you to set the key and value
>>>>> deserializers which are also used to dictate the output PCollection type.
>>>>> It also allows you to set how the watermark should be computed. 
>>>>> Technically
>>>>> a user may want the watermark computation to be configurable per Read and
>>>>> they may also want an output type which is polymorphic (e.g.
>>>>> PCollection<Serializable>).
>>>>> > b) Read extends PTransform which brings its own object modelling
>>>>> concerns.
>>>>> >
>>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>>> discovered that some properties became runtime errors or were ignored if
>>>>> they were set? If no, then the code deduplication is likely worth it
>>>>> because we also get a lot of javadoc deduplication, but if yes is this an
>>>>> acceptable user experience?
>>>>> >
>>>>> >
>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>> [email protected]> wrote:
>>>>> >>
>>>>> >> I believe that the initial goal of unifying ReadAll as a general
>>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
>>>>> amount of code duplication and error-prone approach related to this. It
>>>>> makes much sense since usually we have all needed configuration set in 
>>>>> Read
>>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>>>>> Split-Shuffle-Read stages.  So this case usually can be unified by using
>>>>> PCollection<Read> as input.
>>>>> >>
>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>> cross-language transforms (as Luke described) which seems only partly in
>>>>> common with previous pattern of ReadAll using.
>>>>> >>
>>>>> >> I’d be more in favour to have only one concept of read
>>>>> configuration for all needs but seems it’s not easy and I’d be more in
>>>>> favour with Luke and Boyuan approach with schema. Though, maybe ReadAll is
>>>>> not a very suitable name in this case because it will can bring some
>>>>> confusions related to previous pattern of ReadAll uses.
>>>>> >>
>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote:
>>>>> >>
>>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use
>>>>> the data type that is schema-aware as the input of ReadAll.
>>>>> >>
>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Thanks for the summary, Cham!
>>>>> >>>
>>>>> >>> I think we can go with (2) and (4): use the data type that is
>>>>> schema-aware as the input of ReadAll.
>>>>> >>>
>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO.
>>>>> But only having  (3) is not enough to solve the problem of using ReadAll 
>>>>> in
>>>>> x-lang case.
>>>>> >>>
>>>>> >>> The key point of ReadAll is that the input type of ReadAll should
>>>>> be able to cross language boundaries and have compatibilities of
>>>>> updating/downgrading. After investigating some possibilities(pure java 
>>>>> pojo
>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>>>>> row/schema fits our needs most. Here comes (4). I believe that using Read
>>>>> as input of ReadAll makes sense in some cases, but I also think not all 
>>>>> IOs
>>>>> have the same need. I would treat Read as a special type as long as the
>>>>> Read is schema-aware.
>>>>> >>>
>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>> >>>>
>>>>> >>>> I see. So it seems like there are three options discussed so far
>>>>> when it comes to defining source descriptors for ReadAll type transforms
>>>>> >>>>
>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>> PCollection
>>>>> >>>> (2) Use a POJO that describes the source as the data element of
>>>>> the input PCollection
>>>>> >>>> (3) Provide a converter as a function to the Read transform which
>>>>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>>>> >>>>
>>>>> >>>> I feel like (3) is more suitable for a related set of source
>>>>> descriptions such as files.
>>>>> >>>> (1) will allow most code-reuse but seems like will make it hard
>>>>> to use the ReadAll transform as a cross-language transform and will break
>>>>> the separation of construction time and runtime constructs
>>>>> >>>> (2) could result to less code reuse if not careful but will make
>>>>> the transform easier to be used as a cross-language transform without
>>>>> additional modifications
>>>>> >>>>
>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are
>>>>> more efficient. So we might be able to just define all sources in that
>>>>> format and make Read transforms just an easy to use composite built on top
>>>>> of that (by adding a preceding Create transform).
>>>>> >>>>
>>>>> >>>> Thanks,
>>>>> >>>> Cham
>>>>> >>>>
>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]>
>>>>> wrote:
>>>>> >>>>>
>>>>> >>>>> I believe we do require PTransforms to be serializable since
>>>>> anonymous DoFns typically capture the enclosing PTransform.
>>>>> >>>>>
>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>> >>>>>>
>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
>>>>> least here:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>> >>>>>>
>>>>> >>>>>> I'm in favour of separating construction time transforms from
>>>>> execution time data objects that we store in PCollections as Luke
>>>>> mentioned. Also, we don't guarantee that PTransform is serializable so
>>>>> users have the additional complexity of providing a corder whenever a
>>>>> PTransform is used as a data object.
>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are
>>>>> convertible to Beam Rows allow us to make these transforms available to
>>>>> other SDKs through the cross-language transforms. Using transforms or
>>>>> complex sources as data objects will probably make this difficult.
>>>>> >>>>>>
>>>>> >>>>>> Thanks,
>>>>> >>>>>> Cham
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>> [email protected]> wrote:
>>>>> >>>>>>>
>>>>> >>>>>>> Hi Ismael,
>>>>> >>>>>>>
>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with
>>>>> SDF implementation despite the type of input, where Read refers to
>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>>> making source description across language boundaries is also necessary.  
>>>>> As
>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>> object:
>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>>>>> will be a SchemaCoder. When crossing language boundaries, it's also easy 
>>>>> to
>>>>> convert a Row into the source description: Convert.fromRows.
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]>
>>>>> wrote:
>>>>> >>>>>>>>
>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
>>>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is 
>>>>> a
>>>>> POJO that contains the configurable parameters for reading from Kafka. 
>>>>> This
>>>>> is different from the pattern that Ismael listed because they take
>>>>> PCollection<Read> as input and the Read is the same as the Read PTransform
>>>>> class used for the non read all case.
>>>>> >>>>>>>>
>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>>>>> parameters used to configure the transform have to be copied over to the
>>>>> source descriptor but decouples how a transform is specified from the
>>>>> object that describes what needs to be done. I believe Ismael's point is
>>>>> that we wouldn't need such a decoupling.
>>>>> >>>>>>>>
>>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>>>>> non-issue is that the Beam Java SDK has the most IO connectors and we 
>>>>> would
>>>>> want to use the IO implementations within Beam Go and Beam Python. This
>>>>> brings in its own set of issues related to versioning and compatibility 
>>>>> for
>>>>> the wire format and how one parameterizes such transforms. The wire format
>>>>> issue can be solved with either approach by making sure that the cross
>>>>> language expansion always takes the well known format (whatever it may be)
>>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>>>>> passed to the ReadAll transform. Boyuan has been looking to make the
>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>>>> this can be done easily using the AutoValue integration (I don't believe
>>>>> there is anything preventing someone from writing a schema row -> Read ->
>>>>> row adapter or also using the AutoValue configuration if the transform is
>>>>> also an AutoValue).
>>>>> >>>>>>>>
>>>>> >>>>>>>> I would be more for the code duplication and separation of
>>>>> concerns provided by using a different object to represent the contents of
>>>>> the PCollection from the pipeline construction time PTransform.
>>>>> >>>>>>>>
>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>> [email protected]> wrote:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Hi Ismael,
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach
>>>>> similar (or dual) to FileIO.write(), where we in a sense also have to
>>>>> configure a dynamic number different IO transforms of the same type (file
>>>>> writes)?
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of many
>>>>> file writes:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>> Transaction>writeDynamic()
>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the
>>>>> data to be written to CSVSink
>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>> "-transactions", ".csv"));
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all
>>>>> the read parameters can be inferred
>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this
>>>>> bar...)
>>>>> >>>>>>>>>   ...etc);
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>> [email protected]> wrote:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Hello,
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> (my excuses for the long email but this requires context)
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based
>>>>> ones. One pattern
>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is
>>>>> to have a different
>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>>>>> different sorts of
>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
>>>>> example:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> JdbcIO:
>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> RedisIO:
>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>> PCollection<KV<String, String>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> HBaseIO:
>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>> PCollection<Result>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
>>>>> queries in the same
>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
>>>>> multiple tables at the
>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
>>>>> parameters for
>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>> duplicated with methods and
>>>>> >>>>>>>>>>   error-prone code from the Read transforms into the
>>>>> ReadAll transforms.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> - When you require new parameters you have to expand the
>>>>> input parameters of the
>>>>> >>>>>>>>>>   intermediary specification into something that resembles
>>>>> the full `Read`
>>>>> >>>>>>>>>>   definition for example imagine you want to read from
>>>>> multiple tables or
>>>>> >>>>>>>>>>   servers as part of the same pipeline but this was not in
>>>>> the intermediate
>>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>>> (duplicating more code)
>>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> - If new parameters are added to the Read method we end up
>>>>> adding them
>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they are
>>>>> taken into account.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a new
>>>>> approach that is
>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> HBaseIO:
>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>> PCollection<Result>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> With this approach users gain benefits of improvements on
>>>>> parameters of normal
>>>>> >>>>>>>>>> Read because they count with the full Read parameters. But
>>>>> of course there are
>>>>> >>>>>>>>>> some minor caveats:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> 1. You need to push some information into normal Reads for
>>>>> example
>>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>>> information (in the SDF
>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
>>>>> produces a simple
>>>>> >>>>>>>>>>    pattern that ends up being almost reusable between IOs
>>>>> (e.g. the    non-SDF
>>>>> >>>>>>>>>>    case):
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>> PTransform<PCollection<Read>,
>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>> >>>>>>>>>>     @Override
>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>> expand(PCollection<Read> input) {
>>>>> >>>>>>>>>>       return input
>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>> >>>>>>>>>>     }
>>>>> >>>>>>>>>>   }
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll
>>>>> you must have the
>>>>> >>>>>>>>>>    Coders used in its definition and require consistent
>>>>> types from the data
>>>>> >>>>>>>>>>    sources, in practice this means we need to add extra
>>>>> withCoder method(s) on
>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this
>>>>> ReadAll pattern. RedisIO
>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted
>>>>> to bring this subject
>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see
>>>>> any sort of issues that
>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Also I would like to see if we have consensus to start
>>>>> using consistently the
>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>>>>> readAll() method for new
>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>>> remaining inconsistent
>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this
>>>>> we should be ok).
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on
>>>>> SDF is doing something
>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and
>>>>> maybe it is worth to be
>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Regards,
>>>>> >>>>>>>>>> Ismaël
>>>>> >>
>>>>> >>
>>>>>
>>>>>

Reply via email to