Discussion regarding cross-language transforms is a slight tangent here.
But I think, in general, it's great if we can use existing transforms (for
example, IO connectors) as cross-language transforms without having to
build more composites (irrespective of whether in ExternalTransformBuilders
or a user pipelines) just to make them cross-language compatible. A future
cross-language compatible SchemaCoder might help (assuming that works for
Read transform) but I'm not sure we have a good idea when we'll get to that
state.

Thanks,
Cham

On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <boyu...@google.com> wrote:

> For unbounded SDF in Kafka, we also consider the upgrading/downgrading
> compatibility in the pipeline update scenario(For detailed discussion,
> please refer to
> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
> In order to obtain the compatibility, it requires the input of the read SDF
> is schema-aware.
>
> Thus the major constraint of mapping KafkaSourceDescriptor to
> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware,
> otherwise pipeline updates might fail unnecessarily. If looking into
> KafkaIO.Read, not all necessary fields are compatible with schema, for
> example, SerializedFunction.
>
> I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern for
> SDF based IO. The Read can be a common pattern because the input is always
> a PBegin. But for an SDF based IO, the input can be anything. By using Read
> as input, we will still have the maintenance cost when SDF IO supports a
> new field but Read doesn't consume it. For example, we are discussing
> adding endOffset and endReadTime to KafkaSourceDescriptior, which is not
> used in KafkaIO.Read.
>
> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <ieme...@gmail.com> wrote:
>
>> We forgot to mention (5) External.Config used in cross-lang, see KafkaIO
>> ExternalTransformBuilder. This approach is the predecessor of (4) and
>> probably a
>> really good candidate to be replaced by the Row based Configuration
>> Boyuan is
>> envisioning (so good to be aware of this).
>>
>> Thanks for the clear explanation Luke you mention the real issue(s). All
>> the
>> approaches discussed so far in the end could be easily transformed to
>> produce a
>> PCollection<Read> and those Read Elements could be read by the generic
>> ReadAll
>> transform. Notice that this can be internal in some IOs e.g. KafkaIO if
>> they
>> decide not to expose it. I am not saying that we should force every IO to
>> support ReadAll in its public API but if we do it is probably a good idea
>> to be
>> consistent with naming the transform that expects an input
>> PCollection<Read> in
>> the same way. Also notice that using it will save us of the maintenance
>> issues
>> discussed in my previous email.
>>
>> Back to the main concern: the consequences of expansion based on Read: So
>> far I
>> have not seen consequences for the Splitting part which maps really nice
>> assuming the Partition info / Restriction is available as part of Read.
>> So far
>> there are not Serialization because Beam is already enforcing this.
>> Notice that
>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least for
>> the
>> Bounded case (see the code in my previous email). For the other points:
>>
>> > a) Do all properties set on a Read apply to the ReadAll? For example,
>> the
>> > Kafka Read implementation allows you to set the key and value
>> deserializers
>> > which are also used to dictate the output PCollection type. It also
>> allows you
>> > to set how the watermark should be computed. Technically a user may
>> want the
>> > watermark computation to be configurable per Read and they may also
>> want an
>> > output type which is polymorphic (e.g. Pcollection<Serializable>).
>>
>> Most of the times they do but for parametric types we cannot support
>> different
>> types in the outputs of the Read or at least I did not find how to do so
>> (is
>> there a way to use multiple output Coders on Beam?), we saw this in
>> CassandraIO
>> and we were discussing adding explicitly these Coders or Serializer
>> specific methods to the ReadAll transform. This is less nice because it
>> will
>> imply some repeated methods, but it is still a compromise to gain the
>> other
>> advantages. I suppose the watermark case you mention is similar because
>> you may
>> want the watermark to behave differently in each Read and we probably
>> don’t
>> support this, so it corresponds to the polymorphic category.
>>
>> > b) Read extends PTransform which brings its own object modelling
>> concerns.
>>
>> > During the implementations of ReadAll(PCollection<Read>), was it
>> discovered
>> > that some properties became runtime errors or were ignored if they were
>> set?
>> > If no, then the code deduplication is likely worth it because we also
>> get a
>> > lot of javadoc deduplication, but if yes is this an acceptable user
>> > experience?
>>
>> No, not so far. This is an interesting part, notice that the Read
>> translation
>> ends up delegating the read bits to the ReadFn part of ReadAll so the
>> ReadFn is
>> the real read and must be aware and use all the parameters.
>>
>>     @Override
>>     public PCollection<SolrDocument> expand(PBegin input) {
>>       return input.apply("Create", Create.of(this)).apply("ReadAll",
>> readAll());
>>     }
>>
>> I might be missing something for the Unbounded SDF case which is the only
>> case
>> we have not explored so far. I think one easy way to see the limitations
>> would
>> be in the ongoing KafkaIO SDF based implementation to try to map
>> KafkaSourceDescriptor to do the extra PCollection<Read> and the Read
>> logic on
>> the ReadAll with the SDF to see which constraints we hit, the polymorphic
>> ones
>> will be there for sure, maybe others will appear (not sure). However it
>> would be
>> interesting to see if we have a real gain in the maintenance points, but
>> well
>> let’s not forget also that KafkaIO has a LOT of knobs so probably the
>> generic
>> implementation could be relatively complex.
>>
>>
>>
>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote:
>> >
>> > I had mentioned that approach 1 and approach 2 work for cross language.
>> The difference being that the cross language transform would take a well
>> known definition and convert it to the Read transform. A normal user would
>> have a pipeline that would look like:
>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>> PCollection<Output>
>> >
>> > And in the cross language case this would look like:
>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
>> -> PCollection<Output>*
>> > * note that PTransform(Convert Row to SourceDescriptor) only exists
>> since we haven't solved how to use schemas with language bound types in a
>> cross language way. SchemaCoder isn't portable but RowCoder is which is why
>> the conversion step exists. We could have a solution for this at some point
>> in time.
>> >
>> > My concern with using Read was around:
>> > a) Do all properties set on a Read apply to the ReadAll? For example,
>> the Kafka Read implementation allows you to set the key and value
>> deserializers which are also used to dictate the output PCollection type.
>> It also allows you to set how the watermark should be computed. Technically
>> a user may want the watermark computation to be configurable per Read and
>> they may also want an output type which is polymorphic (e.g.
>> PCollection<Serializable>).
>> > b) Read extends PTransform which brings its own object modelling
>> concerns.
>> >
>> > During the implementations of ReadAll(PCollection<Read>), was it
>> discovered that some properties became runtime errors or were ignored if
>> they were set? If no, then the code deduplication is likely worth it
>> because we also get a lot of javadoc deduplication, but if yes is this an
>> acceptable user experience?
>> >
>> >
>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>> aromanenko....@gmail.com> wrote:
>> >>
>> >> I believe that the initial goal of unifying ReadAll as a general
>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
>> amount of code duplication and error-prone approach related to this. It
>> makes much sense since usually we have all needed configuration set in Read
>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>> Split-Shuffle-Read stages.  So this case usually can be unified by using
>> PCollection<Read> as input.
>> >>
>> >> On the other hand, we have another need to use Java IOs as
>> cross-language transforms (as Luke described) which seems only partly in
>> common with previous pattern of ReadAll using.
>> >>
>> >> I’d be more in favour to have only one concept of read configuration
>> for all needs but seems it’s not easy and I’d be more in favour with Luke
>> and Boyuan approach with schema. Though, maybe ReadAll is not a very
>> suitable name in this case because it will can bring some confusions
>> related to previous pattern of ReadAll uses.
>> >>
>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote:
>> >>
>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use the
>> data type that is schema-aware as the input of ReadAll.
>> >>
>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com>
>> wrote:
>> >>>
>> >>> Thanks for the summary, Cham!
>> >>>
>> >>> I think we can go with (2) and (4): use the data type that is
>> schema-aware as the input of ReadAll.
>> >>>
>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But
>> only having  (3) is not enough to solve the problem of using ReadAll in
>> x-lang case.
>> >>>
>> >>> The key point of ReadAll is that the input type of ReadAll should be
>> able to cross language boundaries and have compatibilities of
>> updating/downgrading. After investigating some possibilities(pure java pojo
>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>> row/schema fits our needs most. Here comes (4). I believe that using Read
>> as input of ReadAll makes sense in some cases, but I also think not all IOs
>> have the same need. I would treat Read as a special type as long as the
>> Read is schema-aware.
>> >>>
>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> >>>>
>> >>>> I see. So it seems like there are three options discussed so far
>> when it comes to defining source descriptors for ReadAll type transforms
>> >>>>
>> >>>> (1) Use Read PTransform as the element type of the input PCollection
>> >>>> (2) Use a POJO that describes the source as the data element of the
>> input PCollection
>> >>>> (3) Provide a converter as a function to the Read transform which
>> essentially will convert it to a ReadAll (what Eugene mentioned)
>> >>>>
>> >>>> I feel like (3) is more suitable for a related set of source
>> descriptions such as files.
>> >>>> (1) will allow most code-reuse but seems like will make it hard to
>> use the ReadAll transform as a cross-language transform and will break the
>> separation of construction time and runtime constructs
>> >>>> (2) could result to less code reuse if not careful but will make the
>> transform easier to be used as a cross-language transform without
>> additional modifications
>> >>>>
>> >>>> Also, with SDF, we can create ReadAll-like transforms that are more
>> efficient. So we might be able to just define all sources in that format
>> and make Read transforms just an easy to use composite built on top of that
>> (by adding a preceding Create transform).
>> >>>>
>> >>>> Thanks,
>> >>>> Cham
>> >>>>
>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote:
>> >>>>>
>> >>>>> I believe we do require PTransforms to be serializable since
>> anonymous DoFns typically capture the enclosing PTransform.
>> >>>>>
>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> >>>>>>
>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
>> least here:
>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>> >>>>>>
>> >>>>>> I'm in favour of separating construction time transforms from
>> execution time data objects that we store in PCollections as Luke
>> mentioned. Also, we don't guarantee that PTransform is serializable so
>> users have the additional complexity of providing a corder whenever a
>> PTransform is used as a data object.
>> >>>>>> Also, agree with Boyuan that using simple Java objects that are
>> convertible to Beam Rows allow us to make these transforms available to
>> other SDKs through the cross-language transforms. Using transforms or
>> complex sources as data objects will probably make this difficult.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Cham
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com>
>> wrote:
>> >>>>>>>
>> >>>>>>> Hi Ismael,
>> >>>>>>>
>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF
>> implementation despite the type of input, where Read refers to
>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>> description is that not all configurations of KafkaIO.Read are meaningful
>> to populate during execution time. Also when thinking about x-lang useage,
>> making source description across language boundaries is also necessary.  As
>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>> will be a SchemaCoder. When crossing language boundaries, it's also easy to
>> convert a Row into the source description: Convert.fromRows.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com>
>> wrote:
>> >>>>>>>>
>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a
>> POJO that contains the configurable parameters for reading from Kafka. This
>> is different from the pattern that Ismael listed because they take
>> PCollection<Read> as input and the Read is the same as the Read PTransform
>> class used for the non read all case.
>> >>>>>>>>
>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>> parameters used to configure the transform have to be copied over to the
>> source descriptor but decouples how a transform is specified from the
>> object that describes what needs to be done. I believe Ismael's point is
>> that we wouldn't need such a decoupling.
>> >>>>>>>>
>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>> non-issue is that the Beam Java SDK has the most IO connectors and we would
>> want to use the IO implementations within Beam Go and Beam Python. This
>> brings in its own set of issues related to versioning and compatibility for
>> the wire format and how one parameterizes such transforms. The wire format
>> issue can be solved with either approach by making sure that the cross
>> language expansion always takes the well known format (whatever it may be)
>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>> passed to the ReadAll transform. Boyuan has been looking to make the
>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>> this can be done easily using the AutoValue integration (I don't believe
>> there is anything preventing someone from writing a schema row -> Read ->
>> row adapter or also using the AutoValue configuration if the transform is
>> also an AutoValue).
>> >>>>>>>>
>> >>>>>>>> I would be more for the code duplication and separation of
>> concerns provided by using a different object to represent the contents of
>> the PCollection from the pipeline construction time PTransform.
>> >>>>>>>>
>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>> kirpic...@google.com> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi Ismael,
>> >>>>>>>>>
>> >>>>>>>>> Thanks for taking this on. Have you considered an approach
>> similar (or dual) to FileIO.write(), where we in a sense also have to
>> configure a dynamic number different IO transforms of the same type (file
>> writes)?
>> >>>>>>>>>
>> >>>>>>>>> E.g. how in this example we configure many aspects of many file
>> writes:
>> >>>>>>>>>
>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>> Transaction>writeDynamic()
>> >>>>>>>>>      .by(Transaction::getType)
>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data
>> to be written to CSVSink
>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>> >>>>>>>>>      .to(".../path/to/")
>> >>>>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions",
>> ".csv"));
>> >>>>>>>>>
>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>> >>>>>>>>>
>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all
>> the read parameters can be inferred
>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>> >>>>>>>>>   ...etc);
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com>
>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hello,
>> >>>>>>>>>>
>> >>>>>>>>>> (my excuses for the long email but this requires context)
>> >>>>>>>>>>
>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones.
>> One pattern
>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to
>> have a different
>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>> different sorts of
>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
>> example:
>> >>>>>>>>>>
>> >>>>>>>>>> JdbcIO:
>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>> >>>>>>>>>>
>> >>>>>>>>>> RedisIO:
>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>> PCollection<KV<String, String>>>
>> >>>>>>>>>>
>> >>>>>>>>>> HBaseIO:
>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>> PCollection<Result>>
>> >>>>>>>>>>
>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
>> queries in the same
>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
>> multiple tables at the
>> >>>>>>>>>> same time but came with some maintenance issues:
>> >>>>>>>>>>
>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
>> parameters for
>> >>>>>>>>>>   missing information so we ended up with lots of duplicated
>> with methods and
>> >>>>>>>>>>   error-prone code from the Read transforms into the ReadAll
>> transforms.
>> >>>>>>>>>>
>> >>>>>>>>>> - When you require new parameters you have to expand the input
>> parameters of the
>> >>>>>>>>>>   intermediary specification into something that resembles the
>> full `Read`
>> >>>>>>>>>>   definition for example imagine you want to read from
>> multiple tables or
>> >>>>>>>>>>   servers as part of the same pipeline but this was not in the
>> intermediate
>> >>>>>>>>>>   specification you end up adding those extra methods
>> (duplicating more code)
>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>> >>>>>>>>>>
>> >>>>>>>>>> - If new parameters are added to the Read method we end up
>> adding them
>> >>>>>>>>>>   systematically to the ReadAll transform too so they are
>> taken into account.
>> >>>>>>>>>>
>> >>>>>>>>>> Due to these issues I recently did a change to test a new
>> approach that is
>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>> >>>>>>>>>>
>> >>>>>>>>>> HBaseIO:
>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>> PCollection<Result>>
>> >>>>>>>>>>
>> >>>>>>>>>> With this approach users gain benefits of improvements on
>> parameters of normal
>> >>>>>>>>>> Read because they count with the full Read parameters. But of
>> course there are
>> >>>>>>>>>> some minor caveats:
>> >>>>>>>>>>
>> >>>>>>>>>> 1. You need to push some information into normal Reads for
>> example
>> >>>>>>>>>>    partition boundaries information or Restriction information
>> (in the SDF
>> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
>> produces a simple
>> >>>>>>>>>>    pattern that ends up being almost reusable between IOs
>> (e.g. the    non-SDF
>> >>>>>>>>>>    case):
>> >>>>>>>>>>
>> >>>>>>>>>>   public static class ReadAll extends
>> PTransform<PCollection<Read>,
>> >>>>>>>>>> PCollection<SolrDocument>> {
>> >>>>>>>>>>     @Override
>> >>>>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read>
>> input) {
>> >>>>>>>>>>       return input
>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>> >>>>>>>>>>     }
>> >>>>>>>>>>   }
>> >>>>>>>>>>
>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you
>> must have the
>> >>>>>>>>>>    Coders used in its definition and require consistent types
>> from the data
>> >>>>>>>>>>    sources, in practice this means we need to add extra
>> withCoder method(s) on
>> >>>>>>>>>>    ReadAll but not the full specs.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
>> pattern. RedisIO
>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to
>> bring this subject
>> >>>>>>>>>> to the mailing list to see your opinions, and if you see any
>> sort of issues that
>> >>>>>>>>>> we might be missing with this idea.
>> >>>>>>>>>>
>> >>>>>>>>>> Also I would like to see if we have consensus to start using
>> consistently the
>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>> readAll() method for new
>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>> remaining inconsistent
>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we
>> should be ok).
>> >>>>>>>>>>
>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF
>> is doing something
>> >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe
>> it is worth to be
>> >>>>>>>>>> consistent for the benefit of users.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Ismaël
>> >>
>> >>
>>
>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote:
>> >
>> > I had mentioned that approach 1 and approach 2 work for cross language.
>> The difference being that the cross language transform would take a well
>> known definition and convert it to the Read transform. A normal user would
>> have a pipeline that would look like:
>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>> PCollection<Output>
>> >
>> > And in the cross language case this would look like:
>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
>> -> PCollection<Output>*
>> > * note that PTransform(Convert Row to SourceDescriptor) only exists
>> since we haven't solved how to use schemas with language bound types in a
>> cross language way. SchemaCoder isn't portable but RowCoder is which is why
>> the conversion step exists. We could have a solution for this at some point
>> in time.
>> >
>> > My concern with using Read was around:
>> > a) Do all properties set on a Read apply to the ReadAll? For example,
>> the Kafka Read implementation allows you to set the key and value
>> deserializers which are also used to dictate the output PCollection type.
>> It also allows you to set how the watermark should be computed. Technically
>> a user may want the watermark computation to be configurable per Read and
>> they may also want an output type which is polymorphic (e.g.
>> PCollection<Serializable>).
>> > b) Read extends PTransform which brings its own object modelling
>> concerns.
>> >
>> > During the implementations of ReadAll(PCollection<Read>), was it
>> discovered that some properties became runtime errors or were ignored if
>> they were set? If no, then the code deduplication is likely worth it
>> because we also get a lot of javadoc deduplication, but if yes is this an
>> acceptable user experience?
>> >
>> >
>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>> aromanenko....@gmail.com> wrote:
>> >>
>> >> I believe that the initial goal of unifying ReadAll as a general
>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
>> amount of code duplication and error-prone approach related to this. It
>> makes much sense since usually we have all needed configuration set in Read
>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>> Split-Shuffle-Read stages.  So this case usually can be unified by using
>> PCollection<Read> as input.
>> >>
>> >> On the other hand, we have another need to use Java IOs as
>> cross-language transforms (as Luke described) which seems only partly in
>> common with previous pattern of ReadAll using.
>> >>
>> >> I’d be more in favour to have only one concept of read configuration
>> for all needs but seems it’s not easy and I’d be more in favour with Luke
>> and Boyuan approach with schema. Though, maybe ReadAll is not a very
>> suitable name in this case because it will can bring some confusions
>> related to previous pattern of ReadAll uses.
>> >>
>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote:
>> >>
>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use the
>> data type that is schema-aware as the input of ReadAll.
>> >>
>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com>
>> wrote:
>> >>>
>> >>> Thanks for the summary, Cham!
>> >>>
>> >>> I think we can go with (2) and (4): use the data type that is
>> schema-aware as the input of ReadAll.
>> >>>
>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But
>> only having  (3) is not enough to solve the problem of using ReadAll in
>> x-lang case.
>> >>>
>> >>> The key point of ReadAll is that the input type of ReadAll should be
>> able to cross language boundaries and have compatibilities of
>> updating/downgrading. After investigating some possibilities(pure java pojo
>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>> row/schema fits our needs most. Here comes (4). I believe that using Read
>> as input of ReadAll makes sense in some cases, but I also think not all IOs
>> have the same need. I would treat Read as a special type as long as the
>> Read is schema-aware.
>> >>>
>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> >>>>
>> >>>> I see. So it seems like there are three options discussed so far
>> when it comes to defining source descriptors for ReadAll type transforms
>> >>>>
>> >>>> (1) Use Read PTransform as the element type of the input PCollection
>> >>>> (2) Use a POJO that describes the source as the data element of the
>> input PCollection
>> >>>> (3) Provide a converter as a function to the Read transform which
>> essentially will convert it to a ReadAll (what Eugene mentioned)
>> >>>>
>> >>>> I feel like (3) is more suitable for a related set of source
>> descriptions such as files.
>> >>>> (1) will allow most code-reuse but seems like will make it hard to
>> use the ReadAll transform as a cross-language transform and will break the
>> separation of construction time and runtime constructs
>> >>>> (2) could result to less code reuse if not careful but will make the
>> transform easier to be used as a cross-language transform without
>> additional modifications
>> >>>>
>> >>>> Also, with SDF, we can create ReadAll-like transforms that are more
>> efficient. So we might be able to just define all sources in that format
>> and make Read transforms just an easy to use composite built on top of that
>> (by adding a preceding Create transform).
>> >>>>
>> >>>> Thanks,
>> >>>> Cham
>> >>>>
>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote:
>> >>>>>
>> >>>>> I believe we do require PTransforms to be serializable since
>> anonymous DoFns typically capture the enclosing PTransform.
>> >>>>>
>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> >>>>>>
>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
>> least here:
>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>> >>>>>>
>> >>>>>> I'm in favour of separating construction time transforms from
>> execution time data objects that we store in PCollections as Luke
>> mentioned. Also, we don't guarantee that PTransform is serializable so
>> users have the additional complexity of providing a corder whenever a
>> PTransform is used as a data object.
>> >>>>>> Also, agree with Boyuan that using simple Java objects that are
>> convertible to Beam Rows allow us to make these transforms available to
>> other SDKs through the cross-language transforms. Using transforms or
>> complex sources as data objects will probably make this difficult.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Cham
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com>
>> wrote:
>> >>>>>>>
>> >>>>>>> Hi Ismael,
>> >>>>>>>
>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF
>> implementation despite the type of input, where Read refers to
>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>> description is that not all configurations of KafkaIO.Read are meaningful
>> to populate during execution time. Also when thinking about x-lang useage,
>> making source description across language boundaries is also necessary.  As
>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>> will be a SchemaCoder. When crossing language boundaries, it's also easy to
>> convert a Row into the source description: Convert.fromRows.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com>
>> wrote:
>> >>>>>>>>
>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a
>> POJO that contains the configurable parameters for reading from Kafka. This
>> is different from the pattern that Ismael listed because they take
>> PCollection<Read> as input and the Read is the same as the Read PTransform
>> class used for the non read all case.
>> >>>>>>>>
>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>> parameters used to configure the transform have to be copied over to the
>> source descriptor but decouples how a transform is specified from the
>> object that describes what needs to be done. I believe Ismael's point is
>> that we wouldn't need such a decoupling.
>> >>>>>>>>
>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>> non-issue is that the Beam Java SDK has the most IO connectors and we would
>> want to use the IO implementations within Beam Go and Beam Python. This
>> brings in its own set of issues related to versioning and compatibility for
>> the wire format and how one parameterizes such transforms. The wire format
>> issue can be solved with either approach by making sure that the cross
>> language expansion always takes the well known format (whatever it may be)
>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>> passed to the ReadAll transform. Boyuan has been looking to make the
>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>> this can be done easily using the AutoValue integration (I don't believe
>> there is anything preventing someone from writing a schema row -> Read ->
>> row adapter or also using the AutoValue configuration if the transform is
>> also an AutoValue).
>> >>>>>>>>
>> >>>>>>>> I would be more for the code duplication and separation of
>> concerns provided by using a different object to represent the contents of
>> the PCollection from the pipeline construction time PTransform.
>> >>>>>>>>
>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>> kirpic...@google.com> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi Ismael,
>> >>>>>>>>>
>> >>>>>>>>> Thanks for taking this on. Have you considered an approach
>> similar (or dual) to FileIO.write(), where we in a sense also have to
>> configure a dynamic number different IO transforms of the same type (file
>> writes)?
>> >>>>>>>>>
>> >>>>>>>>> E.g. how in this example we configure many aspects of many file
>> writes:
>> >>>>>>>>>
>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>> Transaction>writeDynamic()
>> >>>>>>>>>      .by(Transaction::getType)
>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data
>> to be written to CSVSink
>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>> >>>>>>>>>      .to(".../path/to/")
>> >>>>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions",
>> ".csv"));
>> >>>>>>>>>
>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>> >>>>>>>>>
>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all
>> the read parameters can be inferred
>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>> >>>>>>>>>   ...etc);
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com>
>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hello,
>> >>>>>>>>>>
>> >>>>>>>>>> (my excuses for the long email but this requires context)
>> >>>>>>>>>>
>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones.
>> One pattern
>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to
>> have a different
>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>> different sorts of
>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
>> example:
>> >>>>>>>>>>
>> >>>>>>>>>> JdbcIO:
>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>> >>>>>>>>>>
>> >>>>>>>>>> RedisIO:
>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>> PCollection<KV<String, String>>>
>> >>>>>>>>>>
>> >>>>>>>>>> HBaseIO:
>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>> PCollection<Result>>
>> >>>>>>>>>>
>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
>> queries in the same
>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
>> multiple tables at the
>> >>>>>>>>>> same time but came with some maintenance issues:
>> >>>>>>>>>>
>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
>> parameters for
>> >>>>>>>>>>   missing information so we ended up with lots of duplicated
>> with methods and
>> >>>>>>>>>>   error-prone code from the Read transforms into the ReadAll
>> transforms.
>> >>>>>>>>>>
>> >>>>>>>>>> - When you require new parameters you have to expand the input
>> parameters of the
>> >>>>>>>>>>   intermediary specification into something that resembles the
>> full `Read`
>> >>>>>>>>>>   definition for example imagine you want to read from
>> multiple tables or
>> >>>>>>>>>>   servers as part of the same pipeline but this was not in the
>> intermediate
>> >>>>>>>>>>   specification you end up adding those extra methods
>> (duplicating more code)
>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>> >>>>>>>>>>
>> >>>>>>>>>> - If new parameters are added to the Read method we end up
>> adding them
>> >>>>>>>>>>   systematically to the ReadAll transform too so they are
>> taken into account.
>> >>>>>>>>>>
>> >>>>>>>>>> Due to these issues I recently did a change to test a new
>> approach that is
>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>> >>>>>>>>>>
>> >>>>>>>>>> HBaseIO:
>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>> PCollection<Result>>
>> >>>>>>>>>>
>> >>>>>>>>>> With this approach users gain benefits of improvements on
>> parameters of normal
>> >>>>>>>>>> Read because they count with the full Read parameters. But of
>> course there are
>> >>>>>>>>>> some minor caveats:
>> >>>>>>>>>>
>> >>>>>>>>>> 1. You need to push some information into normal Reads for
>> example
>> >>>>>>>>>>    partition boundaries information or Restriction information
>> (in the SDF
>> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
>> produces a simple
>> >>>>>>>>>>    pattern that ends up being almost reusable between IOs
>> (e.g. the    non-SDF
>> >>>>>>>>>>    case):
>> >>>>>>>>>>
>> >>>>>>>>>>   public static class ReadAll extends
>> PTransform<PCollection<Read>,
>> >>>>>>>>>> PCollection<SolrDocument>> {
>> >>>>>>>>>>     @Override
>> >>>>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read>
>> input) {
>> >>>>>>>>>>       return input
>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>> >>>>>>>>>>     }
>> >>>>>>>>>>   }
>> >>>>>>>>>>
>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you
>> must have the
>> >>>>>>>>>>    Coders used in its definition and require consistent types
>> from the data
>> >>>>>>>>>>    sources, in practice this means we need to add extra
>> withCoder method(s) on
>> >>>>>>>>>>    ReadAll but not the full specs.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
>> pattern. RedisIO
>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to
>> bring this subject
>> >>>>>>>>>> to the mailing list to see your opinions, and if you see any
>> sort of issues that
>> >>>>>>>>>> we might be missing with this idea.
>> >>>>>>>>>>
>> >>>>>>>>>> Also I would like to see if we have consensus to start using
>> consistently the
>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>> readAll() method for new
>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>> remaining inconsistent
>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we
>> should be ok).
>> >>>>>>>>>>
>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF
>> is doing something
>> >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe
>> it is worth to be
>> >>>>>>>>>> consistent for the benefit of users.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Ismaël
>> >>
>> >>
>>
>>

Reply via email to