Sorry for jumping into this late and casting a vote against the
consensus... but I think I'd prefer standardizing on a pattern like
PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. That
approach clearly separates the parameters that are allowed to vary across a
ReadAll (the ones defined in KafkaSourceDescriptor) from the parameters
that should be constant (other parameters in the Read object, like
SerializedFunctions for type conversions, parameters for different
operating modes, etc...). I think it's helpful to think of the parameters
that are allowed to vary as some "location descriptor", but I imagine IO
authors may want other parameters to vary across a ReadAll as well.

To me it seems safer to let an IO author "opt-in" to a parameter being
dynamic at execution time.

Brian

On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> I'd like to raise one more time the question of consistency between
> dynamic reads and dynamic writes, per my email at the beginning of the
> thread.
> If the community prefers ReadAll to read from Read, then should
> dynamicWrite's write to Write?
>
> On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <boyu...@google.com> wrote:
>
>> It seems like most of us agree on the idea that ReadAll should read from
>> Read. I'm going to update the Kafka ReadAll with the same pattern.
>> Thanks for all your help!
>>
>> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I would also like to suggest that transforms that implement ReadAll via
>>>> Read should also provide methods like:
>>>>
>>>> // Uses the specified values if unspecified in the input element from
>>>> the PCollection<Read>.
>>>> withDefaults(Read read);
>>>> // Uses the specified values regardless of what the input element from
>>>> the PCollection<Read> specifies.
>>>> withOverrides(Read read);
>>>>
>>>> and only adds methods that are required at construction time (e.g.
>>>> coders). This way the majority of documentation sits on the Read transform.
>>>>
>>>
>>> +0 from me. Sounds like benefits outweigh the drawbacks here and some of
>>> the drawbacks related to cross-language can be overcome through future
>>> advancements.
>>> Thanks for bringing this up Ismaël.
>>>
>>> - Cham
>>>
>>>
>>>>
>>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Ismael, it is good to hear that using Read as the input didn't have a
>>>>> bunch of parameters that were being skipped/ignored. Also, for the
>>>>> polymorphism issue you have to rely on the user correctly telling you the
>>>>> type in such a way where it is a common ancestor of all the runtime types
>>>>> that will ever be used. This usually boils down to something like
>>>>> Serializable or DynamicMessage such that the coder that is chosen works 
>>>>> for
>>>>> all the runtime types. Using multiple types is a valid use case and would
>>>>> allow for a simpler graph with less flattens merging the output from
>>>>> multiple sources.
>>>>>
>>>>> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read
>>>>> which uses schemas even if some of the parameters can't be represented in 
>>>>> a
>>>>> meaningful way beyond "bytes". This would be helpful for cross language as
>>>>> well since every parameter would become available if a language could
>>>>> support it (e.g. it could serialize a java function up front and keep it
>>>>> saved as raw bytes within said language). Even if we figure out a better
>>>>> way to do this in the future, we'll have to change the schema for the new
>>>>> way anyway. This would mean that the external version of the transform
>>>>> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from
>>>>> Row to Read could validate that the parameters make sense (e.g. the bytes
>>>>> are valid serialized functions). The addition of an
>>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
>>>>> this would enable having a bounded version that could be used for 
>>>>> backfills
>>>>> (this doesn't have to be done as part of any current ongoing PR).
>>>>> Essentially any parameter that could be added for a single instance of a
>>>>> Kafka element+restriction would also make sense to the KafkaIO.Read
>>>>> transform since it too is a single instance. There are parameters that
>>>>> would apply to the ReadAll that wouldn't apply to a read and these would 
>>>>> be
>>>>> global parameters across all element+restriction pairs such as config
>>>>> overrides or default values.
>>>>>
>>>>> I am convinced that we should do as Ismael is suggesting and use
>>>>> KafkaIO.Read as the type.
>>>>>
>>>>>
>>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Discussion regarding cross-language transforms is a slight tangent
>>>>>> here. But I think, in general, it's great if we can use existing 
>>>>>> transforms
>>>>>> (for example, IO connectors) as cross-language transforms without having 
>>>>>> to
>>>>>> build more composites (irrespective of whether in 
>>>>>> ExternalTransformBuilders
>>>>>> or a user pipelines) just to make them cross-language compatible. A 
>>>>>> future
>>>>>> cross-language compatible SchemaCoder might help (assuming that works for
>>>>>> Read transform) but I'm not sure we have a good idea when we'll get to 
>>>>>> that
>>>>>> state.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <boyu...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> For unbounded SDF in Kafka, we also consider the
>>>>>>> upgrading/downgrading compatibility in the pipeline update scenario(For
>>>>>>> detailed discussion, please refer to
>>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>>>>>> In order to obtain the compatibility, it requires the input of the read 
>>>>>>> SDF
>>>>>>> is schema-aware.
>>>>>>>
>>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to
>>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware,
>>>>>>> otherwise pipeline updates might fail unnecessarily. If looking into
>>>>>>> KafkaIO.Read, not all necessary fields are compatible with schema, for
>>>>>>> example, SerializedFunction.
>>>>>>>
>>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common
>>>>>>> pattern for SDF based IO. The Read can be a common pattern because the
>>>>>>> input is always a PBegin. But for an SDF based IO, the input can be
>>>>>>> anything. By using Read as input, we will still have the maintenance 
>>>>>>> cost
>>>>>>> when SDF IO supports a new field but Read doesn't consume it. For 
>>>>>>> example,
>>>>>>> we are discussing adding endOffset and endReadTime to
>>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read.
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <ieme...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> We forgot to mention (5) External.Config used in cross-lang, see
>>>>>>>> KafkaIO
>>>>>>>> ExternalTransformBuilder. This approach is the predecessor of (4)
>>>>>>>> and probably a
>>>>>>>> really good candidate to be replaced by the Row based Configuration
>>>>>>>> Boyuan is
>>>>>>>> envisioning (so good to be aware of this).
>>>>>>>>
>>>>>>>> Thanks for the clear explanation Luke you mention the real
>>>>>>>> issue(s). All the
>>>>>>>> approaches discussed so far in the end could be easily transformed
>>>>>>>> to produce a
>>>>>>>> PCollection<Read> and those Read Elements could be read by the
>>>>>>>> generic ReadAll
>>>>>>>> transform. Notice that this can be internal in some IOs e.g.
>>>>>>>> KafkaIO if they
>>>>>>>> decide not to expose it. I am not saying that we should force every
>>>>>>>> IO to
>>>>>>>> support ReadAll in its public API but if we do it is probably a
>>>>>>>> good idea to be
>>>>>>>> consistent with naming the transform that expects an input
>>>>>>>> PCollection<Read> in
>>>>>>>> the same way. Also notice that using it will save us of the
>>>>>>>> maintenance issues
>>>>>>>> discussed in my previous email.
>>>>>>>>
>>>>>>>> Back to the main concern: the consequences of expansion based on
>>>>>>>> Read: So far I
>>>>>>>> have not seen consequences for the Splitting part which maps really
>>>>>>>> nice
>>>>>>>> assuming the Partition info / Restriction is available as part of
>>>>>>>> Read. So far
>>>>>>>> there are not Serialization because Beam is already enforcing this.
>>>>>>>> Notice that
>>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least
>>>>>>>> for the
>>>>>>>> Bounded case (see the code in my previous email). For the other
>>>>>>>> points:
>>>>>>>>
>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>> example, the
>>>>>>>> > Kafka Read implementation allows you to set the key and value
>>>>>>>> deserializers
>>>>>>>> > which are also used to dictate the output PCollection type. It
>>>>>>>> also allows you
>>>>>>>> > to set how the watermark should be computed. Technically a user
>>>>>>>> may want the
>>>>>>>> > watermark computation to be configurable per Read and they may
>>>>>>>> also want an
>>>>>>>> > output type which is polymorphic (e.g. Pcollection<Serializable>).
>>>>>>>>
>>>>>>>> Most of the times they do but for parametric types we cannot
>>>>>>>> support different
>>>>>>>> types in the outputs of the Read or at least I did not find how to
>>>>>>>> do so (is
>>>>>>>> there a way to use multiple output Coders on Beam?), we saw this in
>>>>>>>> CassandraIO
>>>>>>>> and we were discussing adding explicitly these Coders or Serializer
>>>>>>>> specific methods to the ReadAll transform. This is less nice
>>>>>>>> because it will
>>>>>>>> imply some repeated methods, but it is still a compromise to gain
>>>>>>>> the other
>>>>>>>> advantages. I suppose the watermark case you mention is similar
>>>>>>>> because you may
>>>>>>>> want the watermark to behave differently in each Read and we
>>>>>>>> probably don’t
>>>>>>>> support this, so it corresponds to the polymorphic category.
>>>>>>>>
>>>>>>>> > b) Read extends PTransform which brings its own object modelling
>>>>>>>> concerns.
>>>>>>>>
>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>>>>>> discovered
>>>>>>>> > that some properties became runtime errors or were ignored if
>>>>>>>> they were set?
>>>>>>>> > If no, then the code deduplication is likely worth it because we
>>>>>>>> also get a
>>>>>>>> > lot of javadoc deduplication, but if yes is this an acceptable
>>>>>>>> user
>>>>>>>> > experience?
>>>>>>>>
>>>>>>>> No, not so far. This is an interesting part, notice that the Read
>>>>>>>> translation
>>>>>>>> ends up delegating the read bits to the ReadFn part of ReadAll so
>>>>>>>> the ReadFn is
>>>>>>>> the real read and must be aware and use all the parameters.
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public PCollection<SolrDocument> expand(PBegin input) {
>>>>>>>>       return input.apply("Create",
>>>>>>>> Create.of(this)).apply("ReadAll", readAll());
>>>>>>>>     }
>>>>>>>>
>>>>>>>> I might be missing something for the Unbounded SDF case which is
>>>>>>>> the only case
>>>>>>>> we have not explored so far. I think one easy way to see the
>>>>>>>> limitations would
>>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to map
>>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the
>>>>>>>> Read logic on
>>>>>>>> the ReadAll with the SDF to see which constraints we hit, the
>>>>>>>> polymorphic ones
>>>>>>>> will be there for sure, maybe others will appear (not sure).
>>>>>>>> However it would be
>>>>>>>> interesting to see if we have a real gain in the maintenance
>>>>>>>> points, but well
>>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so probably
>>>>>>>> the generic
>>>>>>>> implementation could be relatively complex.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>> >
>>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>>>>>> language. The difference being that the cross language transform would 
>>>>>>>> take
>>>>>>>> a well known definition and convert it to the Read transform. A normal 
>>>>>>>> user
>>>>>>>> would have a pipeline that would look like:
>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>> PCollection<Output>
>>>>>>>> >
>>>>>>>> > And in the cross language case this would look like:
>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row
>>>>>>>> to Read) -> PCollection<Read> -> PTransform(ReadAll) -> 
>>>>>>>> PCollection<Output>
>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row
>>>>>>>> to SourceDescriptor) -> PCollection<SourceDescriptor> ->
>>>>>>>> PTransform(ReadAll) -> PCollection<Output>*
>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only
>>>>>>>> exists since we haven't solved how to use schemas with language bound 
>>>>>>>> types
>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is 
>>>>>>>> which
>>>>>>>> is why the conversion step exists. We could have a solution for this at
>>>>>>>> some point in time.
>>>>>>>> >
>>>>>>>> > My concern with using Read was around:
>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>> example, the Kafka Read implementation allows you to set the key and 
>>>>>>>> value
>>>>>>>> deserializers which are also used to dictate the output PCollection 
>>>>>>>> type.
>>>>>>>> It also allows you to set how the watermark should be computed. 
>>>>>>>> Technically
>>>>>>>> a user may want the watermark computation to be configurable per Read 
>>>>>>>> and
>>>>>>>> they may also want an output type which is polymorphic (e.g.
>>>>>>>> PCollection<Serializable>).
>>>>>>>> > b) Read extends PTransform which brings its own object modelling
>>>>>>>> concerns.
>>>>>>>> >
>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>>>>>> discovered that some properties became runtime errors or were ignored 
>>>>>>>> if
>>>>>>>> they were set? If no, then the code deduplication is likely worth it
>>>>>>>> because we also get a lot of javadoc deduplication, but if yes is this 
>>>>>>>> an
>>>>>>>> acceptable user experience?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a general
>>>>>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce 
>>>>>>>> the
>>>>>>>> amount of code duplication and error-prone approach related to this. It
>>>>>>>> makes much sense since usually we have all needed configuration set in 
>>>>>>>> Read
>>>>>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>>>>>>>> Split-Shuffle-Read stages.  So this case usually can be unified by 
>>>>>>>> using
>>>>>>>> PCollection<Read> as input.
>>>>>>>> >>
>>>>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>>>>> cross-language transforms (as Luke described) which seems only partly 
>>>>>>>> in
>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>> >>
>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>> configuration for all needs but seems it’s not easy and I’d be more in
>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe 
>>>>>>>> ReadAll is
>>>>>>>> not a very suitable name in this case because it will can bring some
>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>> >>
>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4):
>>>>>>>> use the data type that is schema-aware as the input of ReadAll.
>>>>>>>> >>
>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>> >>>
>>>>>>>> >>> I think we can go with (2) and (4): use the data type that is
>>>>>>>> schema-aware as the input of ReadAll.
>>>>>>>> >>>
>>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like
>>>>>>>> IO. But only having  (3) is not enough to solve the problem of using
>>>>>>>> ReadAll in x-lang case.
>>>>>>>> >>>
>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll
>>>>>>>> should be able to cross language boundaries and have compatibilities of
>>>>>>>> updating/downgrading. After investigating some possibilities(pure java 
>>>>>>>> pojo
>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that using 
>>>>>>>> Read
>>>>>>>> as input of ReadAll makes sense in some cases, but I also think not 
>>>>>>>> all IOs
>>>>>>>> have the same need. I would treat Read as a special type as long as the
>>>>>>>> Read is schema-aware.
>>>>>>>> >>>
>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> I see. So it seems like there are three options discussed so
>>>>>>>> far when it comes to defining source descriptors for ReadAll type 
>>>>>>>> transforms
>>>>>>>> >>>>
>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>>>>> PCollection
>>>>>>>> >>>> (2) Use a POJO that describes the source as the data element
>>>>>>>> of the input PCollection
>>>>>>>> >>>> (3) Provide a converter as a function to the Read transform
>>>>>>>> which essentially will convert it to a ReadAll (what Eugene mentioned)
>>>>>>>> >>>>
>>>>>>>> >>>> I feel like (3) is more suitable for a related set of source
>>>>>>>> descriptions such as files.
>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it
>>>>>>>> hard to use the ReadAll transform as a cross-language transform and 
>>>>>>>> will
>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>> >>>> (2) could result to less code reuse if not careful but will
>>>>>>>> make the transform easier to be used as a cross-language transform 
>>>>>>>> without
>>>>>>>> additional modifications
>>>>>>>> >>>>
>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are
>>>>>>>> more efficient. So we might be able to just define all sources in that
>>>>>>>> format and make Read transforms just an easy to use composite built on 
>>>>>>>> top
>>>>>>>> of that (by adding a preceding Create transform).
>>>>>>>> >>>>
>>>>>>>> >>>> Thanks,
>>>>>>>> >>>> Cham
>>>>>>>> >>>>
>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> I believe we do require PTransforms to be serializable since
>>>>>>>> anonymous DoFns typically capture the enclosing PTransform.
>>>>>>>> >>>>>
>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform,
>>>>>>>> at least here:
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> I'm in favour of separating construction time transforms
>>>>>>>> from execution time data objects that we store in PCollections as Luke
>>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable so
>>>>>>>> users have the additional complexity of providing a corder whenever a
>>>>>>>> PTransform is used as a data object.
>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that
>>>>>>>> are convertible to Beam Rows allow us to make these transforms 
>>>>>>>> available to
>>>>>>>> other SDKs through the cross-language transforms. Using transforms or
>>>>>>>> complex sources as data objects will probably make this difficult.
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Thanks,
>>>>>>>> >>>>>> Cham
>>>>>>>> >>>>>>
>>>>>>>> >>>>>>
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>> boyu...@google.com> wrote:
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO
>>>>>>>> with SDF implementation despite the type of input, where Read refers to
>>>>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>>> meaningful
>>>>>>>> to populate during execution time. Also when thinking about x-lang 
>>>>>>>> useage,
>>>>>>>> making source description across language boundaries is also 
>>>>>>>> necessary.  As
>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>>>>> object:
>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also 
>>>>>>>> easy to
>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable 
>>>>>>>> parameters
>>>>>>>> for reading from Kafka. This is different from the pattern that Ismael
>>>>>>>> listed because they take PCollection<Read> as input and the Read is the
>>>>>>>> same as the Read PTransform class used for the non read all case.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>>>>>>>> parameters used to configure the transform have to be copied over to 
>>>>>>>> the
>>>>>>>> source descriptor but decouples how a transform is specified from the
>>>>>>>> object that describes what needs to be done. I believe Ismael's point 
>>>>>>>> is
>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>>>>>>>> non-issue is that the Beam Java SDK has the most IO connectors and we 
>>>>>>>> would
>>>>>>>> want to use the IO implementations within Beam Go and Beam Python. This
>>>>>>>> brings in its own set of issues related to versioning and 
>>>>>>>> compatibility for
>>>>>>>> the wire format and how one parameterizes such transforms. The wire 
>>>>>>>> format
>>>>>>>> issue can be solved with either approach by making sure that the cross
>>>>>>>> language expansion always takes the well known format (whatever it may 
>>>>>>>> be)
>>>>>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>>>>>>>> passed to the ReadAll transform. Boyuan has been looking to make the
>>>>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row 
>>>>>>>> and
>>>>>>>> this can be done easily using the AutoValue integration (I don't 
>>>>>>>> believe
>>>>>>>> there is anything preventing someone from writing a schema row -> Read 
>>>>>>>> ->
>>>>>>>> row adapter or also using the AutoValue configuration if the transform 
>>>>>>>> is
>>>>>>>> also an AutoValue).
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> I would be more for the code duplication and separation of
>>>>>>>> concerns provided by using a different object to represent the 
>>>>>>>> contents of
>>>>>>>> the PCollection from the pipeline construction time PTransform.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense also 
>>>>>>>> have
>>>>>>>> to configure a dynamic number different IO transforms of the same type
>>>>>>>> (file writes)?
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of
>>>>>>>> many file writes:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>> Transaction>writeDynamic()
>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert
>>>>>>>> the data to be written to CSVSink
>>>>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>>>>> "-transactions", ".csv"));
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which
>>>>>>>> all the read parameters can be inferred
>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>> Moo>readAll()
>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this
>>>>>>>> bar...)
>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>> ieme...@gmail.com> wrote:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires context)
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based
>>>>>>>> ones. One pattern
>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea
>>>>>>>> is to have a different
>>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>>>>>>>> different sorts of
>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc,
>>>>>>>> for example:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>>> PCollection<Result>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing
>>>>>>>> multiple queries in the same
>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying
>>>>>>>> from multiple tables at the
>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms
>>>>>>>> the parameters for
>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>>>>> duplicated with methods and
>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into the
>>>>>>>> ReadAll transforms.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand the
>>>>>>>> input parameters of the
>>>>>>>> >>>>>>>>>>   intermediary specification into something that
>>>>>>>> resembles the full `Read`
>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read from
>>>>>>>> multiple tables or
>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this was not
>>>>>>>> in the intermediate
>>>>>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>>>>>> (duplicating more code)
>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we end
>>>>>>>> up adding them
>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they
>>>>>>>> are taken into account.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a
>>>>>>>> new approach that is
>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>> PCollection<Result>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> With this approach users gain benefits of improvements
>>>>>>>> on parameters of normal
>>>>>>>> >>>>>>>>>> Read because they count with the full Read parameters.
>>>>>>>> But of course there are
>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal Reads
>>>>>>>> for example
>>>>>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>>>>>> information (in the SDF
>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of
>>>>>>>> ReadAll produces a simple
>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable between
>>>>>>>> IOs (e.g. the    non-SDF
>>>>>>>> >>>>>>>>>>    case):
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>> >>>>>>>>>>       return input
>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>> >>>>>>>>>>     }
>>>>>>>> >>>>>>>>>>   }
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>> ReadAll you must have the
>>>>>>>> >>>>>>>>>>    Coders used in its definition and require consistent
>>>>>>>> types from the data
>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add extra
>>>>>>>> withCoder method(s) on
>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this
>>>>>>>> ReadAll pattern. RedisIO
>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I
>>>>>>>> wanted to bring this subject
>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see
>>>>>>>> any sort of issues that
>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to start
>>>>>>>> using consistently the
>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>>>>>>>> readAll() method for new
>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>>>>>> remaining inconsistent
>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of
>>>>>>>> this we should be ok).
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on
>>>>>>>> SDF is doing something
>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and
>>>>>>>> maybe it is worth to be
>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>> >
>>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>>>>>> language. The difference being that the cross language transform would 
>>>>>>>> take
>>>>>>>> a well known definition and convert it to the Read transform. A normal 
>>>>>>>> user
>>>>>>>> would have a pipeline that would look like:
>>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>>>>>> PCollection<Output>
>>>>>>>> >
>>>>>>>> > And in the cross language case this would look like:
>>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row
>>>>>>>> to Read) -> PCollection<Read> -> PTransform(ReadAll) -> 
>>>>>>>> PCollection<Output>
>>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row
>>>>>>>> to SourceDescriptor) -> PCollection<SourceDescriptor> ->
>>>>>>>> PTransform(ReadAll) -> PCollection<Output>*
>>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only
>>>>>>>> exists since we haven't solved how to use schemas with language bound 
>>>>>>>> types
>>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is 
>>>>>>>> which
>>>>>>>> is why the conversion step exists. We could have a solution for this at
>>>>>>>> some point in time.
>>>>>>>> >
>>>>>>>> > My concern with using Read was around:
>>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For
>>>>>>>> example, the Kafka Read implementation allows you to set the key and 
>>>>>>>> value
>>>>>>>> deserializers which are also used to dictate the output PCollection 
>>>>>>>> type.
>>>>>>>> It also allows you to set how the watermark should be computed. 
>>>>>>>> Technically
>>>>>>>> a user may want the watermark computation to be configurable per Read 
>>>>>>>> and
>>>>>>>> they may also want an output type which is polymorphic (e.g.
>>>>>>>> PCollection<Serializable>).
>>>>>>>> > b) Read extends PTransform which brings its own object modelling
>>>>>>>> concerns.
>>>>>>>> >
>>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>>>>>> discovered that some properties became runtime errors or were ignored 
>>>>>>>> if
>>>>>>>> they were set? If no, then the code deduplication is likely worth it
>>>>>>>> because we also get a lot of javadoc deduplication, but if yes is this 
>>>>>>>> an
>>>>>>>> acceptable user experience?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> I believe that the initial goal of unifying ReadAll as a general
>>>>>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce 
>>>>>>>> the
>>>>>>>> amount of code duplication and error-prone approach related to this. It
>>>>>>>> makes much sense since usually we have all needed configuration set in 
>>>>>>>> Read
>>>>>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>>>>>>>> Split-Shuffle-Read stages.  So this case usually can be unified by 
>>>>>>>> using
>>>>>>>> PCollection<Read> as input.
>>>>>>>> >>
>>>>>>>> >> On the other hand, we have another need to use Java IOs as
>>>>>>>> cross-language transforms (as Luke described) which seems only partly 
>>>>>>>> in
>>>>>>>> common with previous pattern of ReadAll using.
>>>>>>>> >>
>>>>>>>> >> I’d be more in favour to have only one concept of read
>>>>>>>> configuration for all needs but seems it’s not easy and I’d be more in
>>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe 
>>>>>>>> ReadAll is
>>>>>>>> not a very suitable name in this case because it will can bring some
>>>>>>>> confusions related to previous pattern of ReadAll uses.
>>>>>>>> >>
>>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4):
>>>>>>>> use the data type that is schema-aware as the input of ReadAll.
>>>>>>>> >>
>>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Thanks for the summary, Cham!
>>>>>>>> >>>
>>>>>>>> >>> I think we can go with (2) and (4): use the data type that is
>>>>>>>> schema-aware as the input of ReadAll.
>>>>>>>> >>>
>>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like
>>>>>>>> IO. But only having  (3) is not enough to solve the problem of using
>>>>>>>> ReadAll in x-lang case.
>>>>>>>> >>>
>>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll
>>>>>>>> should be able to cross language boundaries and have compatibilities of
>>>>>>>> updating/downgrading. After investigating some possibilities(pure java 
>>>>>>>> pojo
>>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>>>>>>>> row/schema fits our needs most. Here comes (4). I believe that using 
>>>>>>>> Read
>>>>>>>> as input of ReadAll makes sense in some cases, but I also think not 
>>>>>>>> all IOs
>>>>>>>> have the same need. I would treat Read as a special type as long as the
>>>>>>>> Read is schema-aware.
>>>>>>>> >>>
>>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> I see. So it seems like there are three options discussed so
>>>>>>>> far when it comes to defining source descriptors for ReadAll type 
>>>>>>>> transforms
>>>>>>>> >>>>
>>>>>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>>>>>> PCollection
>>>>>>>> >>>> (2) Use a POJO that describes the source as the data element
>>>>>>>> of the input PCollection
>>>>>>>> >>>> (3) Provide a converter as a function to the Read transform
>>>>>>>> which essentially will convert it to a ReadAll (what Eugene mentioned)
>>>>>>>> >>>>
>>>>>>>> >>>> I feel like (3) is more suitable for a related set of source
>>>>>>>> descriptions such as files.
>>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it
>>>>>>>> hard to use the ReadAll transform as a cross-language transform and 
>>>>>>>> will
>>>>>>>> break the separation of construction time and runtime constructs
>>>>>>>> >>>> (2) could result to less code reuse if not careful but will
>>>>>>>> make the transform easier to be used as a cross-language transform 
>>>>>>>> without
>>>>>>>> additional modifications
>>>>>>>> >>>>
>>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are
>>>>>>>> more efficient. So we might be able to just define all sources in that
>>>>>>>> format and make Read transforms just an easy to use composite built on 
>>>>>>>> top
>>>>>>>> of that (by adding a preceding Create transform).
>>>>>>>> >>>>
>>>>>>>> >>>> Thanks,
>>>>>>>> >>>> Cham
>>>>>>>> >>>>
>>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> I believe we do require PTransforms to be serializable since
>>>>>>>> anonymous DoFns typically capture the enclosing PTransform.
>>>>>>>> >>>>>
>>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform,
>>>>>>>> at least here:
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> I'm in favour of separating construction time transforms
>>>>>>>> from execution time data objects that we store in PCollections as Luke
>>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable so
>>>>>>>> users have the additional complexity of providing a corder whenever a
>>>>>>>> PTransform is used as a data object.
>>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that
>>>>>>>> are convertible to Beam Rows allow us to make these transforms 
>>>>>>>> available to
>>>>>>>> other SDKs through the cross-language transforms. Using transforms or
>>>>>>>> complex sources as data objects will probably make this difficult.
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Thanks,
>>>>>>>> >>>>>> Cham
>>>>>>>> >>>>>>
>>>>>>>> >>>>>>
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>>>>>> boyu...@google.com> wrote:
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Hi Ismael,
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO
>>>>>>>> with SDF implementation despite the type of input, where Read refers to
>>>>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>>>>> description is that not all configurations of KafkaIO.Read are 
>>>>>>>> meaningful
>>>>>>>> to populate during execution time. Also when thinking about x-lang 
>>>>>>>> useage,
>>>>>>>> making source description across language boundaries is also 
>>>>>>>> necessary.  As
>>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>>>>> object:
>>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also 
>>>>>>>> easy to
>>>>>>>> convert a Row into the source description: Convert.fromRows.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll
>>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This
>>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable 
>>>>>>>> parameters
>>>>>>>> for reading from Kafka. This is different from the pattern that Ismael
>>>>>>>> listed because they take PCollection<Read> as input and the Read is the
>>>>>>>> same as the Read PTransform class used for the non read all case.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>>>>>>>> parameters used to configure the transform have to be copied over to 
>>>>>>>> the
>>>>>>>> source descriptor but decouples how a transform is specified from the
>>>>>>>> object that describes what needs to be done. I believe Ismael's point 
>>>>>>>> is
>>>>>>>> that we wouldn't need such a decoupling.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>>>>>>>> non-issue is that the Beam Java SDK has the most IO connectors and we 
>>>>>>>> would
>>>>>>>> want to use the IO implementations within Beam Go and Beam Python. This
>>>>>>>> brings in its own set of issues related to versioning and 
>>>>>>>> compatibility for
>>>>>>>> the wire format and how one parameterizes such transforms. The wire 
>>>>>>>> format
>>>>>>>> issue can be solved with either approach by making sure that the cross
>>>>>>>> language expansion always takes the well known format (whatever it may 
>>>>>>>> be)
>>>>>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>>>>>>>> passed to the ReadAll transform. Boyuan has been looking to make the
>>>>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row 
>>>>>>>> and
>>>>>>>> this can be done easily using the AutoValue integration (I don't 
>>>>>>>> believe
>>>>>>>> there is anything preventing someone from writing a schema row -> Read 
>>>>>>>> ->
>>>>>>>> row adapter or also using the AutoValue configuration if the transform 
>>>>>>>> is
>>>>>>>> also an AutoValue).
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> I would be more for the code duplication and separation of
>>>>>>>> concerns provided by using a different object to represent the 
>>>>>>>> contents of
>>>>>>>> the PCollection from the pipeline construction time PTransform.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Hi Ismael,
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an
>>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense also 
>>>>>>>> have
>>>>>>>> to configure a dynamic number different IO transforms of the same type
>>>>>>>> (file writes)?
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of
>>>>>>>> many file writes:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>>> Transaction>writeDynamic()
>>>>>>>> >>>>>>>>>      .by(Transaction::getType)
>>>>>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert
>>>>>>>> the data to be written to CSVSink
>>>>>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>> >>>>>>>>>      .to(".../path/to/")
>>>>>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>>>>>> "-transactions", ".csv"));
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which
>>>>>>>> all the read parameters can be inferred
>>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar,
>>>>>>>> Moo>readAll()
>>>>>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this
>>>>>>>> bar...)
>>>>>>>> >>>>>>>>>   ...etc);
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>>>>>> ieme...@gmail.com> wrote:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Hello,
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires context)
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based
>>>>>>>> ones. One pattern
>>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea
>>>>>>>> is to have a different
>>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>>>>>>>> different sorts of
>>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc,
>>>>>>>> for example:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> JdbcIO:
>>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> RedisIO:
>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>>> PCollection<Result>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing
>>>>>>>> multiple queries in the same
>>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying
>>>>>>>> from multiple tables at the
>>>>>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms
>>>>>>>> the parameters for
>>>>>>>> >>>>>>>>>>   missing information so we ended up with lots of
>>>>>>>> duplicated with methods and
>>>>>>>> >>>>>>>>>>   error-prone code from the Read transforms into the
>>>>>>>> ReadAll transforms.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand the
>>>>>>>> input parameters of the
>>>>>>>> >>>>>>>>>>   intermediary specification into something that
>>>>>>>> resembles the full `Read`
>>>>>>>> >>>>>>>>>>   definition for example imagine you want to read from
>>>>>>>> multiple tables or
>>>>>>>> >>>>>>>>>>   servers as part of the same pipeline but this was not
>>>>>>>> in the intermediate
>>>>>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>>>>>> (duplicating more code)
>>>>>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we end
>>>>>>>> up adding them
>>>>>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they
>>>>>>>> are taken into account.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a
>>>>>>>> new approach that is
>>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> HBaseIO:
>>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>>>>>> PCollection<Result>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> With this approach users gain benefits of improvements
>>>>>>>> on parameters of normal
>>>>>>>> >>>>>>>>>> Read because they count with the full Read parameters.
>>>>>>>> But of course there are
>>>>>>>> >>>>>>>>>> some minor caveats:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> 1. You need to push some information into normal Reads
>>>>>>>> for example
>>>>>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>>>>>> information (in the SDF
>>>>>>>> >>>>>>>>>>    case).  Notice that this consistent approach of
>>>>>>>> ReadAll produces a simple
>>>>>>>> >>>>>>>>>>    pattern that ends up being almost reusable between
>>>>>>>> IOs (e.g. the    non-SDF
>>>>>>>> >>>>>>>>>>    case):
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>>   public static class ReadAll extends
>>>>>>>> PTransform<PCollection<Read>,
>>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>> >>>>>>>>>>     @Override
>>>>>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>>>>>> expand(PCollection<Read> input) {
>>>>>>>> >>>>>>>>>>       return input
>>>>>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>> >>>>>>>>>>     }
>>>>>>>> >>>>>>>>>>   }
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results
>>>>>>>> ReadAll you must have the
>>>>>>>> >>>>>>>>>>    Coders used in its definition and require consistent
>>>>>>>> types from the data
>>>>>>>> >>>>>>>>>>    sources, in practice this means we need to add extra
>>>>>>>> withCoder method(s) on
>>>>>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this
>>>>>>>> ReadAll pattern. RedisIO
>>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I
>>>>>>>> wanted to bring this subject
>>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see
>>>>>>>> any sort of issues that
>>>>>>>> >>>>>>>>>> we might be missing with this idea.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to start
>>>>>>>> using consistently the
>>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>>>>>>>> readAll() method for new
>>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>>>>>> remaining inconsistent
>>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of
>>>>>>>> this we should be ok).
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on
>>>>>>>> SDF is doing something
>>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and
>>>>>>>> maybe it is worth to be
>>>>>>>> >>>>>>>>>> consistent for the benefit of users.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Regards,
>>>>>>>> >>>>>>>>>> Ismaël
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>>
>>>>>>>>

Reply via email to