For unbounded SDF in Kafka, we also consider the upgrading/downgrading
compatibility in the pipeline update scenario(For detailed discussion,
please refer to
https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
In order to obtain the compatibility, it requires the input of the read SDF
is schema-aware.

Thus the major constraint of mapping KafkaSourceDescriptor to
PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware,
otherwise pipeline updates might fail unnecessarily. If looking into
KafkaIO.Read, not all necessary fields are compatible with schema, for
example, SerializedFunction.

I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern for
SDF based IO. The Read can be a common pattern because the input is always
a PBegin. But for an SDF based IO, the input can be anything. By using Read
as input, we will still have the maintenance cost when SDF IO supports a
new field but Read doesn't consume it. For example, we are discussing
adding endOffset and endReadTime to KafkaSourceDescriptior, which is not
used in KafkaIO.Read.

On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <ieme...@gmail.com> wrote:

> We forgot to mention (5) External.Config used in cross-lang, see KafkaIO
> ExternalTransformBuilder. This approach is the predecessor of (4) and
> probably a
> really good candidate to be replaced by the Row based Configuration Boyuan
> is
> envisioning (so good to be aware of this).
>
> Thanks for the clear explanation Luke you mention the real issue(s). All
> the
> approaches discussed so far in the end could be easily transformed to
> produce a
> PCollection<Read> and those Read Elements could be read by the generic
> ReadAll
> transform. Notice that this can be internal in some IOs e.g. KafkaIO if
> they
> decide not to expose it. I am not saying that we should force every IO to
> support ReadAll in its public API but if we do it is probably a good idea
> to be
> consistent with naming the transform that expects an input
> PCollection<Read> in
> the same way. Also notice that using it will save us of the maintenance
> issues
> discussed in my previous email.
>
> Back to the main concern: the consequences of expansion based on Read: So
> far I
> have not seen consequences for the Splitting part which maps really nice
> assuming the Partition info / Restriction is available as part of Read. So
> far
> there are not Serialization because Beam is already enforcing this. Notice
> that
> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least for the
> Bounded case (see the code in my previous email). For the other points:
>
> > a) Do all properties set on a Read apply to the ReadAll? For example, the
> > Kafka Read implementation allows you to set the key and value
> deserializers
> > which are also used to dictate the output PCollection type. It also
> allows you
> > to set how the watermark should be computed. Technically a user may want
> the
> > watermark computation to be configurable per Read and they may also want
> an
> > output type which is polymorphic (e.g. Pcollection<Serializable>).
>
> Most of the times they do but for parametric types we cannot support
> different
> types in the outputs of the Read or at least I did not find how to do so
> (is
> there a way to use multiple output Coders on Beam?), we saw this in
> CassandraIO
> and we were discussing adding explicitly these Coders or Serializer
> specific methods to the ReadAll transform. This is less nice because it
> will
> imply some repeated methods, but it is still a compromise to gain the other
> advantages. I suppose the watermark case you mention is similar because
> you may
> want the watermark to behave differently in each Read and we probably don’t
> support this, so it corresponds to the polymorphic category.
>
> > b) Read extends PTransform which brings its own object modelling
> concerns.
>
> > During the implementations of ReadAll(PCollection<Read>), was it
> discovered
> > that some properties became runtime errors or were ignored if they were
> set?
> > If no, then the code deduplication is likely worth it because we also
> get a
> > lot of javadoc deduplication, but if yes is this an acceptable user
> > experience?
>
> No, not so far. This is an interesting part, notice that the Read
> translation
> ends up delegating the read bits to the ReadFn part of ReadAll so the
> ReadFn is
> the real read and must be aware and use all the parameters.
>
>     @Override
>     public PCollection<SolrDocument> expand(PBegin input) {
>       return input.apply("Create", Create.of(this)).apply("ReadAll",
> readAll());
>     }
>
> I might be missing something for the Unbounded SDF case which is the only
> case
> we have not explored so far. I think one easy way to see the limitations
> would
> be in the ongoing KafkaIO SDF based implementation to try to map
> KafkaSourceDescriptor to do the extra PCollection<Read> and the Read logic
> on
> the ReadAll with the SDF to see which constraints we hit, the polymorphic
> ones
> will be there for sure, maybe others will appear (not sure). However it
> would be
> interesting to see if we have a real gain in the maintenance points, but
> well
> let’s not forget also that KafkaIO has a LOT of knobs so probably the
> generic
> implementation could be relatively complex.
>
>
>
> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote:
> >
> > I had mentioned that approach 1 and approach 2 work for cross language.
> The difference being that the cross language transform would take a well
> known definition and convert it to the Read transform. A normal user would
> have a pipeline that would look like:
> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
> PCollection<Output>
> >
> > And in the cross language case this would look like:
> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
> -> PCollection<Output>*
> > * note that PTransform(Convert Row to SourceDescriptor) only exists
> since we haven't solved how to use schemas with language bound types in a
> cross language way. SchemaCoder isn't portable but RowCoder is which is why
> the conversion step exists. We could have a solution for this at some point
> in time.
> >
> > My concern with using Read was around:
> > a) Do all properties set on a Read apply to the ReadAll? For example,
> the Kafka Read implementation allows you to set the key and value
> deserializers which are also used to dictate the output PCollection type.
> It also allows you to set how the watermark should be computed. Technically
> a user may want the watermark computation to be configurable per Read and
> they may also want an output type which is polymorphic (e.g.
> PCollection<Serializable>).
> > b) Read extends PTransform which brings its own object modelling
> concerns.
> >
> > During the implementations of ReadAll(PCollection<Read>), was it
> discovered that some properties became runtime errors or were ignored if
> they were set? If no, then the code deduplication is likely worth it
> because we also get a lot of javadoc deduplication, but if yes is this an
> acceptable user experience?
> >
> >
> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
> aromanenko....@gmail.com> wrote:
> >>
> >> I believe that the initial goal of unifying ReadAll as a general
> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
> amount of code duplication and error-prone approach related to this. It
> makes much sense since usually we have all needed configuration set in Read
> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
> Split-Shuffle-Read stages.  So this case usually can be unified by using
> PCollection<Read> as input.
> >>
> >> On the other hand, we have another need to use Java IOs as
> cross-language transforms (as Luke described) which seems only partly in
> common with previous pattern of ReadAll using.
> >>
> >> I’d be more in favour to have only one concept of read configuration
> for all needs but seems it’s not easy and I’d be more in favour with Luke
> and Boyuan approach with schema. Though, maybe ReadAll is not a very
> suitable name in this case because it will can bring some confusions
> related to previous pattern of ReadAll uses.
> >>
> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote:
> >>
> >> Sorry for the typo. I mean I think we can go with (3) and (4): use the
> data type that is schema-aware as the input of ReadAll.
> >>
> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com>
> wrote:
> >>>
> >>> Thanks for the summary, Cham!
> >>>
> >>> I think we can go with (2) and (4): use the data type that is
> schema-aware as the input of ReadAll.
> >>>
> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But
> only having  (3) is not enough to solve the problem of using ReadAll in
> x-lang case.
> >>>
> >>> The key point of ReadAll is that the input type of ReadAll should be
> able to cross language boundaries and have compatibilities of
> updating/downgrading. After investigating some possibilities(pure java pojo
> with custom coder, protobuf, row/schema) in Kafka usage, we find that
> row/schema fits our needs most. Here comes (4). I believe that using Read
> as input of ReadAll makes sense in some cases, but I also think not all IOs
> have the same need. I would treat Read as a special type as long as the
> Read is schema-aware.
> >>>
> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>>
> >>>> I see. So it seems like there are three options discussed so far when
> it comes to defining source descriptors for ReadAll type transforms
> >>>>
> >>>> (1) Use Read PTransform as the element type of the input PCollection
> >>>> (2) Use a POJO that describes the source as the data element of the
> input PCollection
> >>>> (3) Provide a converter as a function to the Read transform which
> essentially will convert it to a ReadAll (what Eugene mentioned)
> >>>>
> >>>> I feel like (3) is more suitable for a related set of source
> descriptions such as files.
> >>>> (1) will allow most code-reuse but seems like will make it hard to
> use the ReadAll transform as a cross-language transform and will break the
> separation of construction time and runtime constructs
> >>>> (2) could result to less code reuse if not careful but will make the
> transform easier to be used as a cross-language transform without
> additional modifications
> >>>>
> >>>> Also, with SDF, we can create ReadAll-like transforms that are more
> efficient. So we might be able to just define all sources in that format
> and make Read transforms just an easy to use composite built on top of that
> (by adding a preceding Create transform).
> >>>>
> >>>> Thanks,
> >>>> Cham
> >>>>
> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote:
> >>>>>
> >>>>> I believe we do require PTransforms to be serializable since
> anonymous DoFns typically capture the enclosing PTransform.
> >>>>>
> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>>>>
> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
> least here:
> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
> >>>>>>
> >>>>>> I'm in favour of separating construction time transforms from
> execution time data objects that we store in PCollections as Luke
> mentioned. Also, we don't guarantee that PTransform is serializable so
> users have the additional complexity of providing a corder whenever a
> PTransform is used as a data object.
> >>>>>> Also, agree with Boyuan that using simple Java objects that are
> convertible to Beam Rows allow us to make these transforms available to
> other SDKs through the cross-language transforms. Using transforms or
> complex sources as data objects will probably make this difficult.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Cham
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> Hi Ismael,
> >>>>>>>
> >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF
> implementation despite the type of input, where Read refers to
> UnboundedSource.  One major pushback of using KafkaIO.Read as source
> description is that not all configurations of KafkaIO.Read are meaningful
> to populate during execution time. Also when thinking about x-lang useage,
> making source description across language boundaries is also necessary.  As
> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
> KafkaSourceDescription.java. Then the coder of this schema-aware object
> will be a SchemaCoder. When crossing language boundaries, it's also easy to
> convert a Row into the source description: Convert.fromRows.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com>
> wrote:
> >>>>>>>>
> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a
> POJO that contains the configurable parameters for reading from Kafka. This
> is different from the pattern that Ismael listed because they take
> PCollection<Read> as input and the Read is the same as the Read PTransform
> class used for the non read all case.
> >>>>>>>>
> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
> parameters used to configure the transform have to be copied over to the
> source descriptor but decouples how a transform is specified from the
> object that describes what needs to be done. I believe Ismael's point is
> that we wouldn't need such a decoupling.
> >>>>>>>>
> >>>>>>>> Another area that hasn't been discussed and I believe is a
> non-issue is that the Beam Java SDK has the most IO connectors and we would
> want to use the IO implementations within Beam Go and Beam Python. This
> brings in its own set of issues related to versioning and compatibility for
> the wire format and how one parameterizes such transforms. The wire format
> issue can be solved with either approach by making sure that the cross
> language expansion always takes the well known format (whatever it may be)
> and converts it into Read/KafkaSourceDescriptor/... object that is then
> passed to the ReadAll transform. Boyuan has been looking to make the
> KafkaSourceDescriptor have a schema so it can be represented as a row and
> this can be done easily using the AutoValue integration (I don't believe
> there is anything preventing someone from writing a schema row -> Read ->
> row adapter or also using the AutoValue configuration if the transform is
> also an AutoValue).
> >>>>>>>>
> >>>>>>>> I would be more for the code duplication and separation of
> concerns provided by using a different object to represent the contents of
> the PCollection from the pipeline construction time PTransform.
> >>>>>>>>
> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
> kirpic...@google.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Ismael,
> >>>>>>>>>
> >>>>>>>>> Thanks for taking this on. Have you considered an approach
> similar (or dual) to FileIO.write(), where we in a sense also have to
> configure a dynamic number different IO transforms of the same type (file
> writes)?
> >>>>>>>>>
> >>>>>>>>> E.g. how in this example we configure many aspects of many file
> writes:
> >>>>>>>>>
> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
> Transaction>writeDynamic()
> >>>>>>>>>      .by(Transaction::getType)
> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data
> to be written to CSVSink
> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
> >>>>>>>>>      .to(".../path/to/")
> >>>>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions",
> ".csv"));
> >>>>>>>>>
> >>>>>>>>> we could do something similar for many JdbcIO reads:
> >>>>>>>>>
> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all the
> read parameters can be inferred
> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
> >>>>>>>>>   ...etc);
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hello,
> >>>>>>>>>>
> >>>>>>>>>> (my excuses for the long email but this requires context)
> >>>>>>>>>>
> >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones.
> One pattern
> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to
> have a different
> >>>>>>>>>> kind of composable reads where we take a PCollection of
> different sorts of
> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
> example:
> >>>>>>>>>>
> >>>>>>>>>> JdbcIO:
> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
> >>>>>>>>>>
> >>>>>>>>>> RedisIO:
> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
> PCollection<KV<String, String>>>
> >>>>>>>>>>
> >>>>>>>>>> HBaseIO:
> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
> PCollection<Result>>
> >>>>>>>>>>
> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
> queries in the same
> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
> multiple tables at the
> >>>>>>>>>> same time but came with some maintenance issues:
> >>>>>>>>>>
> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
> parameters for
> >>>>>>>>>>   missing information so we ended up with lots of duplicated
> with methods and
> >>>>>>>>>>   error-prone code from the Read transforms into the ReadAll
> transforms.
> >>>>>>>>>>
> >>>>>>>>>> - When you require new parameters you have to expand the input
> parameters of the
> >>>>>>>>>>   intermediary specification into something that resembles the
> full `Read`
> >>>>>>>>>>   definition for example imagine you want to read from multiple
> tables or
> >>>>>>>>>>   servers as part of the same pipeline but this was not in the
> intermediate
> >>>>>>>>>>   specification you end up adding those extra methods
> (duplicating more code)
> >>>>>>>>>>   just o get close to the be like the Read full spec.
> >>>>>>>>>>
> >>>>>>>>>> - If new parameters are added to the Read method we end up
> adding them
> >>>>>>>>>>   systematically to the ReadAll transform too so they are taken
> into account.
> >>>>>>>>>>
> >>>>>>>>>> Due to these issues I recently did a change to test a new
> approach that is
> >>>>>>>>>> simpler, more complete and maintainable. The code became:
> >>>>>>>>>>
> >>>>>>>>>> HBaseIO:
> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
> PCollection<Result>>
> >>>>>>>>>>
> >>>>>>>>>> With this approach users gain benefits of improvements on
> parameters of normal
> >>>>>>>>>> Read because they count with the full Read parameters. But of
> course there are
> >>>>>>>>>> some minor caveats:
> >>>>>>>>>>
> >>>>>>>>>> 1. You need to push some information into normal Reads for
> example
> >>>>>>>>>>    partition boundaries information or Restriction information
> (in the SDF
> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
> produces a simple
> >>>>>>>>>>    pattern that ends up being almost reusable between IOs (e.g.
> the    non-SDF
> >>>>>>>>>>    case):
> >>>>>>>>>>
> >>>>>>>>>>   public static class ReadAll extends
> PTransform<PCollection<Read>,
> >>>>>>>>>> PCollection<SolrDocument>> {
> >>>>>>>>>>     @Override
> >>>>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read>
> input) {
> >>>>>>>>>>       return input
> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
> >>>>>>>>>>     }
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you
> must have the
> >>>>>>>>>>    Coders used in its definition and require consistent types
> from the data
> >>>>>>>>>>    sources, in practice this means we need to add extra
> withCoder method(s) on
> >>>>>>>>>>    ReadAll but not the full specs.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
> pattern. RedisIO
> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to
> bring this subject
> >>>>>>>>>> to the mailing list to see your opinions, and if you see any
> sort of issues that
> >>>>>>>>>> we might be missing with this idea.
> >>>>>>>>>>
> >>>>>>>>>> Also I would like to see if we have consensus to start using
> consistently the
> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
> readAll() method for new
> >>>>>>>>>> IOs (at this point probably outdoing this in the only remaining
> inconsistent
> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we
> should be ok).
> >>>>>>>>>>
> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is
> doing something
> >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe
> it is worth to be
> >>>>>>>>>> consistent for the benefit of users.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Ismaël
> >>
> >>
>
> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote:
> >
> > I had mentioned that approach 1 and approach 2 work for cross language.
> The difference being that the cross language transform would take a well
> known definition and convert it to the Read transform. A normal user would
> have a pipeline that would look like:
> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
> PCollection<Output>
> >
> > And in the cross language case this would look like:
> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
> -> PCollection<Output>*
> > * note that PTransform(Convert Row to SourceDescriptor) only exists
> since we haven't solved how to use schemas with language bound types in a
> cross language way. SchemaCoder isn't portable but RowCoder is which is why
> the conversion step exists. We could have a solution for this at some point
> in time.
> >
> > My concern with using Read was around:
> > a) Do all properties set on a Read apply to the ReadAll? For example,
> the Kafka Read implementation allows you to set the key and value
> deserializers which are also used to dictate the output PCollection type.
> It also allows you to set how the watermark should be computed. Technically
> a user may want the watermark computation to be configurable per Read and
> they may also want an output type which is polymorphic (e.g.
> PCollection<Serializable>).
> > b) Read extends PTransform which brings its own object modelling
> concerns.
> >
> > During the implementations of ReadAll(PCollection<Read>), was it
> discovered that some properties became runtime errors or were ignored if
> they were set? If no, then the code deduplication is likely worth it
> because we also get a lot of javadoc deduplication, but if yes is this an
> acceptable user experience?
> >
> >
> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
> aromanenko....@gmail.com> wrote:
> >>
> >> I believe that the initial goal of unifying ReadAll as a general
> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
> amount of code duplication and error-prone approach related to this. It
> makes much sense since usually we have all needed configuration set in Read
> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
> Split-Shuffle-Read stages.  So this case usually can be unified by using
> PCollection<Read> as input.
> >>
> >> On the other hand, we have another need to use Java IOs as
> cross-language transforms (as Luke described) which seems only partly in
> common with previous pattern of ReadAll using.
> >>
> >> I’d be more in favour to have only one concept of read configuration
> for all needs but seems it’s not easy and I’d be more in favour with Luke
> and Boyuan approach with schema. Though, maybe ReadAll is not a very
> suitable name in this case because it will can bring some confusions
> related to previous pattern of ReadAll uses.
> >>
> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote:
> >>
> >> Sorry for the typo. I mean I think we can go with (3) and (4): use the
> data type that is schema-aware as the input of ReadAll.
> >>
> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com>
> wrote:
> >>>
> >>> Thanks for the summary, Cham!
> >>>
> >>> I think we can go with (2) and (4): use the data type that is
> schema-aware as the input of ReadAll.
> >>>
> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But
> only having  (3) is not enough to solve the problem of using ReadAll in
> x-lang case.
> >>>
> >>> The key point of ReadAll is that the input type of ReadAll should be
> able to cross language boundaries and have compatibilities of
> updating/downgrading. After investigating some possibilities(pure java pojo
> with custom coder, protobuf, row/schema) in Kafka usage, we find that
> row/schema fits our needs most. Here comes (4). I believe that using Read
> as input of ReadAll makes sense in some cases, but I also think not all IOs
> have the same need. I would treat Read as a special type as long as the
> Read is schema-aware.
> >>>
> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>>
> >>>> I see. So it seems like there are three options discussed so far when
> it comes to defining source descriptors for ReadAll type transforms
> >>>>
> >>>> (1) Use Read PTransform as the element type of the input PCollection
> >>>> (2) Use a POJO that describes the source as the data element of the
> input PCollection
> >>>> (3) Provide a converter as a function to the Read transform which
> essentially will convert it to a ReadAll (what Eugene mentioned)
> >>>>
> >>>> I feel like (3) is more suitable for a related set of source
> descriptions such as files.
> >>>> (1) will allow most code-reuse but seems like will make it hard to
> use the ReadAll transform as a cross-language transform and will break the
> separation of construction time and runtime constructs
> >>>> (2) could result to less code reuse if not careful but will make the
> transform easier to be used as a cross-language transform without
> additional modifications
> >>>>
> >>>> Also, with SDF, we can create ReadAll-like transforms that are more
> efficient. So we might be able to just define all sources in that format
> and make Read transforms just an easy to use composite built on top of that
> (by adding a preceding Create transform).
> >>>>
> >>>> Thanks,
> >>>> Cham
> >>>>
> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote:
> >>>>>
> >>>>> I believe we do require PTransforms to be serializable since
> anonymous DoFns typically capture the enclosing PTransform.
> >>>>>
> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>>>>
> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
> least here:
> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
> >>>>>>
> >>>>>> I'm in favour of separating construction time transforms from
> execution time data objects that we store in PCollections as Luke
> mentioned. Also, we don't guarantee that PTransform is serializable so
> users have the additional complexity of providing a corder whenever a
> PTransform is used as a data object.
> >>>>>> Also, agree with Boyuan that using simple Java objects that are
> convertible to Beam Rows allow us to make these transforms available to
> other SDKs through the cross-language transforms. Using transforms or
> complex sources as data objects will probably make this difficult.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Cham
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> Hi Ismael,
> >>>>>>>
> >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF
> implementation despite the type of input, where Read refers to
> UnboundedSource.  One major pushback of using KafkaIO.Read as source
> description is that not all configurations of KafkaIO.Read are meaningful
> to populate during execution time. Also when thinking about x-lang useage,
> making source description across language boundaries is also necessary.  As
> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
> KafkaSourceDescription.java. Then the coder of this schema-aware object
> will be a SchemaCoder. When crossing language boundaries, it's also easy to
> convert a Row into the source description: Convert.fromRows.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com>
> wrote:
> >>>>>>>>
> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a
> POJO that contains the configurable parameters for reading from Kafka. This
> is different from the pattern that Ismael listed because they take
> PCollection<Read> as input and the Read is the same as the Read PTransform
> class used for the non read all case.
> >>>>>>>>
> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
> parameters used to configure the transform have to be copied over to the
> source descriptor but decouples how a transform is specified from the
> object that describes what needs to be done. I believe Ismael's point is
> that we wouldn't need such a decoupling.
> >>>>>>>>
> >>>>>>>> Another area that hasn't been discussed and I believe is a
> non-issue is that the Beam Java SDK has the most IO connectors and we would
> want to use the IO implementations within Beam Go and Beam Python. This
> brings in its own set of issues related to versioning and compatibility for
> the wire format and how one parameterizes such transforms. The wire format
> issue can be solved with either approach by making sure that the cross
> language expansion always takes the well known format (whatever it may be)
> and converts it into Read/KafkaSourceDescriptor/... object that is then
> passed to the ReadAll transform. Boyuan has been looking to make the
> KafkaSourceDescriptor have a schema so it can be represented as a row and
> this can be done easily using the AutoValue integration (I don't believe
> there is anything preventing someone from writing a schema row -> Read ->
> row adapter or also using the AutoValue configuration if the transform is
> also an AutoValue).
> >>>>>>>>
> >>>>>>>> I would be more for the code duplication and separation of
> concerns provided by using a different object to represent the contents of
> the PCollection from the pipeline construction time PTransform.
> >>>>>>>>
> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
> kirpic...@google.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Ismael,
> >>>>>>>>>
> >>>>>>>>> Thanks for taking this on. Have you considered an approach
> similar (or dual) to FileIO.write(), where we in a sense also have to
> configure a dynamic number different IO transforms of the same type (file
> writes)?
> >>>>>>>>>
> >>>>>>>>> E.g. how in this example we configure many aspects of many file
> writes:
> >>>>>>>>>
> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
> Transaction>writeDynamic()
> >>>>>>>>>      .by(Transaction::getType)
> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data
> to be written to CSVSink
> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
> >>>>>>>>>      .to(".../path/to/")
> >>>>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions",
> ".csv"));
> >>>>>>>>>
> >>>>>>>>> we could do something similar for many JdbcIO reads:
> >>>>>>>>>
> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all the
> read parameters can be inferred
> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
> >>>>>>>>>   ...etc);
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hello,
> >>>>>>>>>>
> >>>>>>>>>> (my excuses for the long email but this requires context)
> >>>>>>>>>>
> >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones.
> One pattern
> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to
> have a different
> >>>>>>>>>> kind of composable reads where we take a PCollection of
> different sorts of
> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
> example:
> >>>>>>>>>>
> >>>>>>>>>> JdbcIO:
> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
> >>>>>>>>>>
> >>>>>>>>>> RedisIO:
> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
> PCollection<KV<String, String>>>
> >>>>>>>>>>
> >>>>>>>>>> HBaseIO:
> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
> PCollection<Result>>
> >>>>>>>>>>
> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
> queries in the same
> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
> multiple tables at the
> >>>>>>>>>> same time but came with some maintenance issues:
> >>>>>>>>>>
> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
> parameters for
> >>>>>>>>>>   missing information so we ended up with lots of duplicated
> with methods and
> >>>>>>>>>>   error-prone code from the Read transforms into the ReadAll
> transforms.
> >>>>>>>>>>
> >>>>>>>>>> - When you require new parameters you have to expand the input
> parameters of the
> >>>>>>>>>>   intermediary specification into something that resembles the
> full `Read`
> >>>>>>>>>>   definition for example imagine you want to read from multiple
> tables or
> >>>>>>>>>>   servers as part of the same pipeline but this was not in the
> intermediate
> >>>>>>>>>>   specification you end up adding those extra methods
> (duplicating more code)
> >>>>>>>>>>   just o get close to the be like the Read full spec.
> >>>>>>>>>>
> >>>>>>>>>> - If new parameters are added to the Read method we end up
> adding them
> >>>>>>>>>>   systematically to the ReadAll transform too so they are taken
> into account.
> >>>>>>>>>>
> >>>>>>>>>> Due to these issues I recently did a change to test a new
> approach that is
> >>>>>>>>>> simpler, more complete and maintainable. The code became:
> >>>>>>>>>>
> >>>>>>>>>> HBaseIO:
> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
> PCollection<Result>>
> >>>>>>>>>>
> >>>>>>>>>> With this approach users gain benefits of improvements on
> parameters of normal
> >>>>>>>>>> Read because they count with the full Read parameters. But of
> course there are
> >>>>>>>>>> some minor caveats:
> >>>>>>>>>>
> >>>>>>>>>> 1. You need to push some information into normal Reads for
> example
> >>>>>>>>>>    partition boundaries information or Restriction information
> (in the SDF
> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
> produces a simple
> >>>>>>>>>>    pattern that ends up being almost reusable between IOs (e.g.
> the    non-SDF
> >>>>>>>>>>    case):
> >>>>>>>>>>
> >>>>>>>>>>   public static class ReadAll extends
> PTransform<PCollection<Read>,
> >>>>>>>>>> PCollection<SolrDocument>> {
> >>>>>>>>>>     @Override
> >>>>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read>
> input) {
> >>>>>>>>>>       return input
> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
> >>>>>>>>>>     }
> >>>>>>>>>>   }
> >>>>>>>>>>
> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you
> must have the
> >>>>>>>>>>    Coders used in its definition and require consistent types
> from the data
> >>>>>>>>>>    sources, in practice this means we need to add extra
> withCoder method(s) on
> >>>>>>>>>>    ReadAll but not the full specs.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
> pattern. RedisIO
> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to
> bring this subject
> >>>>>>>>>> to the mailing list to see your opinions, and if you see any
> sort of issues that
> >>>>>>>>>> we might be missing with this idea.
> >>>>>>>>>>
> >>>>>>>>>> Also I would like to see if we have consensus to start using
> consistently the
> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
> readAll() method for new
> >>>>>>>>>> IOs (at this point probably outdoing this in the only remaining
> inconsistent
> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we
> should be ok).
> >>>>>>>>>>
> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is
> doing something
> >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe
> it is worth to be
> >>>>>>>>>> consistent for the benefit of users.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Ismaël
> >>
> >>
>
>

Reply via email to