I would also like to suggest that transforms that implement ReadAll via
Read should also provide methods like:

// Uses the specified values if unspecified in the input element from the
PCollection<Read>.
withDefaults(Read read);
// Uses the specified values regardless of what the input element from the
PCollection<Read> specifies.
withOverrides(Read read);

and only adds methods that are required at construction time (e.g. coders).
This way the majority of documentation sits on the Read transform.

On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> wrote:

> Ismael, it is good to hear that using Read as the input didn't have a
> bunch of parameters that were being skipped/ignored. Also, for the
> polymorphism issue you have to rely on the user correctly telling you the
> type in such a way where it is a common ancestor of all the runtime types
> that will ever be used. This usually boils down to something like
> Serializable or DynamicMessage such that the coder that is chosen works for
> all the runtime types. Using multiple types is a valid use case and would
> allow for a simpler graph with less flattens merging the output from
> multiple sources.
>
> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which
> uses schemas even if some of the parameters can't be represented in a
> meaningful way beyond "bytes". This would be helpful for cross language as
> well since every parameter would become available if a language could
> support it (e.g. it could serialize a java function up front and keep it
> saved as raw bytes within said language). Even if we figure out a better
> way to do this in the future, we'll have to change the schema for the new
> way anyway. This would mean that the external version of the transform
> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from
> Row to Read could validate that the parameters make sense (e.g. the bytes
> are valid serialized functions). The addition of an
> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
> this would enable having a bounded version that could be used for backfills
> (this doesn't have to be done as part of any current ongoing PR).
> Essentially any parameter that could be added for a single instance of a
> Kafka element+restriction would also make sense to the KafkaIO.Read
> transform since it too is a single instance. There are parameters that
> would apply to the ReadAll that wouldn't apply to a read and these would be
> global parameters across all element+restriction pairs such as config
> overrides or default values.
>
> I am convinced that we should do as Ismael is suggesting and use
> KafkaIO.Read as the type.
>
>
> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> Discussion regarding cross-language transforms is a slight tangent here.
>> But I think, in general, it's great if we can use existing transforms (for
>> example, IO connectors) as cross-language transforms without having to
>> build more composites (irrespective of whether in ExternalTransformBuilders
>> or a user pipelines) just to make them cross-language compatible. A future
>> cross-language compatible SchemaCoder might help (assuming that works for
>> Read transform) but I'm not sure we have a good idea when we'll get to that
>> state.
>>
>> Thanks,
>> Cham
>>
>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <[email protected]> wrote:
>>
>>> For unbounded SDF in Kafka, we also consider the upgrading/downgrading
>>> compatibility in the pipeline update scenario(For detailed discussion,
>>> please refer to
>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>> In order to obtain the compatibility, it requires the input of the read SDF
>>> is schema-aware.
>>>
>>> Thus the major constraint of mapping KafkaSourceDescriptor to
>>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware,
>>> otherwise pipeline updates might fail unnecessarily. If looking into
>>> KafkaIO.Read, not all necessary fields are compatible with schema, for
>>> example, SerializedFunction.
>>>
>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern
>>> for SDF based IO. The Read can be a common pattern because the input is
>>> always a PBegin. But for an SDF based IO, the input can be anything. By
>>> using Read as input, we will still have the maintenance cost when SDF IO
>>> supports a new field but Read doesn't consume it. For example, we are
>>> discussing adding endOffset and endReadTime to KafkaSourceDescriptior,
>>> which is not used in KafkaIO.Read.
>>>
>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <[email protected]> wrote:
>>>
>>>> We forgot to mention (5) External.Config used in cross-lang, see KafkaIO
>>>> ExternalTransformBuilder. This approach is the predecessor of (4) and
>>>> probably a
>>>> really good candidate to be replaced by the Row based Configuration
>>>> Boyuan is
>>>> envisioning (so good to be aware of this).
>>>>
>>>> Thanks for the clear explanation Luke you mention the real issue(s).
>>>> All the
>>>> approaches discussed so far in the end could be easily transformed to
>>>> produce a
>>>> PCollection<Read> and those Read Elements could be read by the generic
>>>> ReadAll
>>>> transform. Notice that this can be internal in some IOs e.g. KafkaIO if
>>>> they
>>>> decide not to expose it. I am not saying that we should force every IO
>>>> to
>>>> support ReadAll in its public API but if we do it is probably a good
>>>> idea to be
>>>> consistent with naming the transform that expects an input
>>>> PCollection<Read> in
>>>> the same way. Also notice that using it will save us of the maintenance
>>>> issues
>>>> discussed in my previous email.
>>>>
>>>> Back to the main concern: the consequences of expansion based on Read:
>>>> So far I
>>>> have not seen consequences for the Splitting part which maps really nice
>>>> assuming the Partition info / Restriction is available as part of Read.
>>>> So far
>>>> there are not Serialization because Beam is already enforcing this.
>>>> Notice that
>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least for
>>>> the
>>>> Bounded case (see the code in my previous email). For the other points:
>>>>
>>>> > a) Do all properties set on a Read apply to the ReadAll? For example,
>>>> the
>>>> > Kafka Read implementation allows you to set the key and value
>>>> deserializers
>>>> > which are also used to dictate the output PCollection type. It also
>>>> allows you
>>>> > to set how the watermark should be computed. Technically a user may
>>>> want the
>>>> > watermark computation to be configurable per Read and they may also
>>>> want an
>>>> > output type which is polymorphic (e.g. Pcollection<Serializable>).
>>>>
>>>> Most of the times they do but for parametric types we cannot support
>>>> different
>>>> types in the outputs of the Read or at least I did not find how to do
>>>> so (is
>>>> there a way to use multiple output Coders on Beam?), we saw this in
>>>> CassandraIO
>>>> and we were discussing adding explicitly these Coders or Serializer
>>>> specific methods to the ReadAll transform. This is less nice because it
>>>> will
>>>> imply some repeated methods, but it is still a compromise to gain the
>>>> other
>>>> advantages. I suppose the watermark case you mention is similar because
>>>> you may
>>>> want the watermark to behave differently in each Read and we probably
>>>> don’t
>>>> support this, so it corresponds to the polymorphic category.
>>>>
>>>> > b) Read extends PTransform which brings its own object modelling
>>>> concerns.
>>>>
>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>> discovered
>>>> > that some properties became runtime errors or were ignored if they
>>>> were set?
>>>> > If no, then the code deduplication is likely worth it because we also
>>>> get a
>>>> > lot of javadoc deduplication, but if yes is this an acceptable user
>>>> > experience?
>>>>
>>>> No, not so far. This is an interesting part, notice that the Read
>>>> translation
>>>> ends up delegating the read bits to the ReadFn part of ReadAll so the
>>>> ReadFn is
>>>> the real read and must be aware and use all the parameters.
>>>>
>>>>     @Override
>>>>     public PCollection<SolrDocument> expand(PBegin input) {
>>>>       return input.apply("Create", Create.of(this)).apply("ReadAll",
>>>> readAll());
>>>>     }
>>>>
>>>> I might be missing something for the Unbounded SDF case which is the
>>>> only case
>>>> we have not explored so far. I think one easy way to see the
>>>> limitations would
>>>> be in the ongoing KafkaIO SDF based implementation to try to map
>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the Read
>>>> logic on
>>>> the ReadAll with the SDF to see which constraints we hit, the
>>>> polymorphic ones
>>>> will be there for sure, maybe others will appear (not sure). However it
>>>> would be
>>>> interesting to see if we have a real gain in the maintenance points,
>>>> but well
>>>> let’s not forget also that KafkaIO has a LOT of knobs so probably the
>>>> generic
>>>> implementation could be relatively complex.
>>>>
>>>>
>>>>
>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote:
>>>> >
>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>> language. The difference being that the cross language transform would take
>>>> a well known definition and convert it to the Read transform. A normal user
>>>> would have a pipeline that would look like:
>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>> PCollection<Output>
>>>> >
>>>> > And in the cross language case this would look like:
>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
>>>> -> PCollection<Output>*
>>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists
>>>> since we haven't solved how to use schemas with language bound types in a
>>>> cross language way. SchemaCoder isn't portable but RowCoder is which is why
>>>> the conversion step exists. We could have a solution for this at some point
>>>> in time.
>>>> >
>>>> > My concern with using Read was around:
>>>> > a) Do all properties set on a Read apply to the ReadAll? For example,
>>>> the Kafka Read implementation allows you to set the key and value
>>>> deserializers which are also used to dictate the output PCollection type.
>>>> It also allows you to set how the watermark should be computed. Technically
>>>> a user may want the watermark computation to be configurable per Read and
>>>> they may also want an output type which is polymorphic (e.g.
>>>> PCollection<Serializable>).
>>>> > b) Read extends PTransform which brings its own object modelling
>>>> concerns.
>>>> >
>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>> discovered that some properties became runtime errors or were ignored if
>>>> they were set? If no, then the code deduplication is likely worth it
>>>> because we also get a lot of javadoc deduplication, but if yes is this an
>>>> acceptable user experience?
>>>> >
>>>> >
>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>> [email protected]> wrote:
>>>> >>
>>>> >> I believe that the initial goal of unifying ReadAll as a general
>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
>>>> amount of code duplication and error-prone approach related to this. It
>>>> makes much sense since usually we have all needed configuration set in Read
>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>>>> Split-Shuffle-Read stages.  So this case usually can be unified by using
>>>> PCollection<Read> as input.
>>>> >>
>>>> >> On the other hand, we have another need to use Java IOs as
>>>> cross-language transforms (as Luke described) which seems only partly in
>>>> common with previous pattern of ReadAll using.
>>>> >>
>>>> >> I’d be more in favour to have only one concept of read configuration
>>>> for all needs but seems it’s not easy and I’d be more in favour with Luke
>>>> and Boyuan approach with schema. Though, maybe ReadAll is not a very
>>>> suitable name in this case because it will can bring some confusions
>>>> related to previous pattern of ReadAll uses.
>>>> >>
>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote:
>>>> >>
>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use
>>>> the data type that is schema-aware as the input of ReadAll.
>>>> >>
>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>> >>>
>>>> >>> Thanks for the summary, Cham!
>>>> >>>
>>>> >>> I think we can go with (2) and (4): use the data type that is
>>>> schema-aware as the input of ReadAll.
>>>> >>>
>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO.
>>>> But only having  (3) is not enough to solve the problem of using ReadAll in
>>>> x-lang case.
>>>> >>>
>>>> >>> The key point of ReadAll is that the input type of ReadAll should
>>>> be able to cross language boundaries and have compatibilities of
>>>> updating/downgrading. After investigating some possibilities(pure java pojo
>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>>>> row/schema fits our needs most. Here comes (4). I believe that using Read
>>>> as input of ReadAll makes sense in some cases, but I also think not all IOs
>>>> have the same need. I would treat Read as a special type as long as the
>>>> Read is schema-aware.
>>>> >>>
>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>> >>>>
>>>> >>>> I see. So it seems like there are three options discussed so far
>>>> when it comes to defining source descriptors for ReadAll type transforms
>>>> >>>>
>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>> PCollection
>>>> >>>> (2) Use a POJO that describes the source as the data element of
>>>> the input PCollection
>>>> >>>> (3) Provide a converter as a function to the Read transform which
>>>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>>> >>>>
>>>> >>>> I feel like (3) is more suitable for a related set of source
>>>> descriptions such as files.
>>>> >>>> (1) will allow most code-reuse but seems like will make it hard to
>>>> use the ReadAll transform as a cross-language transform and will break the
>>>> separation of construction time and runtime constructs
>>>> >>>> (2) could result to less code reuse if not careful but will make
>>>> the transform easier to be used as a cross-language transform without
>>>> additional modifications
>>>> >>>>
>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are
>>>> more efficient. So we might be able to just define all sources in that
>>>> format and make Read transforms just an easy to use composite built on top
>>>> of that (by adding a preceding Create transform).
>>>> >>>>
>>>> >>>> Thanks,
>>>> >>>> Cham
>>>> >>>>
>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]>
>>>> wrote:
>>>> >>>>>
>>>> >>>>> I believe we do require PTransforms to be serializable since
>>>> anonymous DoFns typically capture the enclosing PTransform.
>>>> >>>>>
>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>> >>>>>>
>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
>>>> least here:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>> >>>>>>
>>>> >>>>>> I'm in favour of separating construction time transforms from
>>>> execution time data objects that we store in PCollections as Luke
>>>> mentioned. Also, we don't guarantee that PTransform is serializable so
>>>> users have the additional complexity of providing a corder whenever a
>>>> PTransform is used as a data object.
>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are
>>>> convertible to Beam Rows allow us to make these transforms available to
>>>> other SDKs through the cross-language transforms. Using transforms or
>>>> complex sources as data objects will probably make this difficult.
>>>> >>>>>>
>>>> >>>>>> Thanks,
>>>> >>>>>> Cham
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>> [email protected]> wrote:
>>>> >>>>>>>
>>>> >>>>>>> Hi Ismael,
>>>> >>>>>>>
>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with
>>>> SDF implementation despite the type of input, where Read refers to
>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>> making source description across language boundaries is also necessary.  As
>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>>>> will be a SchemaCoder. When crossing language boundaries, it's also easy to
>>>> convert a Row into the source description: Convert.fromRows.
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]>
>>>> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
>>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a
>>>> POJO that contains the configurable parameters for reading from Kafka. This
>>>> is different from the pattern that Ismael listed because they take
>>>> PCollection<Read> as input and the Read is the same as the Read PTransform
>>>> class used for the non read all case.
>>>> >>>>>>>>
>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>>>> parameters used to configure the transform have to be copied over to the
>>>> source descriptor but decouples how a transform is specified from the
>>>> object that describes what needs to be done. I believe Ismael's point is
>>>> that we wouldn't need such a decoupling.
>>>> >>>>>>>>
>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>>>> non-issue is that the Beam Java SDK has the most IO connectors and we would
>>>> want to use the IO implementations within Beam Go and Beam Python. This
>>>> brings in its own set of issues related to versioning and compatibility for
>>>> the wire format and how one parameterizes such transforms. The wire format
>>>> issue can be solved with either approach by making sure that the cross
>>>> language expansion always takes the well known format (whatever it may be)
>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>>>> passed to the ReadAll transform. Boyuan has been looking to make the
>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>>> this can be done easily using the AutoValue integration (I don't believe
>>>> there is anything preventing someone from writing a schema row -> Read ->
>>>> row adapter or also using the AutoValue configuration if the transform is
>>>> also an AutoValue).
>>>> >>>>>>>>
>>>> >>>>>>>> I would be more for the code duplication and separation of
>>>> concerns provided by using a different object to represent the contents of
>>>> the PCollection from the pipeline construction time PTransform.
>>>> >>>>>>>>
>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>> [email protected]> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> Hi Ismael,
>>>> >>>>>>>>>
>>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach
>>>> similar (or dual) to FileIO.write(), where we in a sense also have to
>>>> configure a dynamic number different IO transforms of the same type (file
>>>> writes)?
>>>> >>>>>>>>>
>>>> >>>>>>>>> E.g. how in this example we configure many aspects of many
>>>> file writes:
>>>> >>>>>>>>>
>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>> Transaction>writeDynamic()
>>>> >>>>>>>>>      .by(Transaction::getType)
>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the
>>>> data to be written to CSVSink
>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>> >>>>>>>>>      .to(".../path/to/")
>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>> "-transactions", ".csv"));
>>>> >>>>>>>>>
>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>> >>>>>>>>>
>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all
>>>> the read parameters can be inferred
>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>>> >>>>>>>>>   ...etc);
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>> [email protected]> wrote:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Hello,
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> (my excuses for the long email but this requires context)
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based
>>>> ones. One pattern
>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to
>>>> have a different
>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>>>> different sorts of
>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
>>>> example:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> JdbcIO:
>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> RedisIO:
>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>> PCollection<KV<String, String>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> HBaseIO:
>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>> PCollection<Result>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
>>>> queries in the same
>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
>>>> multiple tables at the
>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
>>>> parameters for
>>>> >>>>>>>>>>   missing information so we ended up with lots of duplicated
>>>> with methods and
>>>> >>>>>>>>>>   error-prone code from the Read transforms into the ReadAll
>>>> transforms.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> - When you require new parameters you have to expand the
>>>> input parameters of the
>>>> >>>>>>>>>>   intermediary specification into something that resembles
>>>> the full `Read`
>>>> >>>>>>>>>>   definition for example imagine you want to read from
>>>> multiple tables or
>>>> >>>>>>>>>>   servers as part of the same pipeline but this was not in
>>>> the intermediate
>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>> (duplicating more code)
>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> - If new parameters are added to the Read method we end up
>>>> adding them
>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they are
>>>> taken into account.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Due to these issues I recently did a change to test a new
>>>> approach that is
>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> HBaseIO:
>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>> PCollection<Result>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> With this approach users gain benefits of improvements on
>>>> parameters of normal
>>>> >>>>>>>>>> Read because they count with the full Read parameters. But
>>>> of course there are
>>>> >>>>>>>>>> some minor caveats:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> 1. You need to push some information into normal Reads for
>>>> example
>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>> information (in the SDF
>>>> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
>>>> produces a simple
>>>> >>>>>>>>>>    pattern that ends up being almost reusable between IOs
>>>> (e.g. the    non-SDF
>>>> >>>>>>>>>>    case):
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>   public static class ReadAll extends
>>>> PTransform<PCollection<Read>,
>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>> >>>>>>>>>>     @Override
>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>> expand(PCollection<Read> input) {
>>>> >>>>>>>>>>       return input
>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>> >>>>>>>>>>     }
>>>> >>>>>>>>>>   }
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll
>>>> you must have the
>>>> >>>>>>>>>>    Coders used in its definition and require consistent
>>>> types from the data
>>>> >>>>>>>>>>    sources, in practice this means we need to add extra
>>>> withCoder method(s) on
>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
>>>> pattern. RedisIO
>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted
>>>> to bring this subject
>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see any
>>>> sort of issues that
>>>> >>>>>>>>>> we might be missing with this idea.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Also I would like to see if we have consensus to start using
>>>> consistently the
>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>>>> readAll() method for new
>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>> remaining inconsistent
>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this
>>>> we should be ok).
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF
>>>> is doing something
>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and
>>>> maybe it is worth to be
>>>> >>>>>>>>>> consistent for the benefit of users.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Regards,
>>>> >>>>>>>>>> Ismaël
>>>> >>
>>>> >>
>>>>
>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote:
>>>> >
>>>> > I had mentioned that approach 1 and approach 2 work for cross
>>>> language. The difference being that the cross language transform would take
>>>> a well known definition and convert it to the Read transform. A normal user
>>>> would have a pipeline that would look like:
>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) ->
>>>> PCollection<Output>
>>>> >
>>>> > And in the cross language case this would look like:
>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output>
>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to
>>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll)
>>>> -> PCollection<Output>*
>>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists
>>>> since we haven't solved how to use schemas with language bound types in a
>>>> cross language way. SchemaCoder isn't portable but RowCoder is which is why
>>>> the conversion step exists. We could have a solution for this at some point
>>>> in time.
>>>> >
>>>> > My concern with using Read was around:
>>>> > a) Do all properties set on a Read apply to the ReadAll? For example,
>>>> the Kafka Read implementation allows you to set the key and value
>>>> deserializers which are also used to dictate the output PCollection type.
>>>> It also allows you to set how the watermark should be computed. Technically
>>>> a user may want the watermark computation to be configurable per Read and
>>>> they may also want an output type which is polymorphic (e.g.
>>>> PCollection<Serializable>).
>>>> > b) Read extends PTransform which brings its own object modelling
>>>> concerns.
>>>> >
>>>> > During the implementations of ReadAll(PCollection<Read>), was it
>>>> discovered that some properties became runtime errors or were ignored if
>>>> they were set? If no, then the code deduplication is likely worth it
>>>> because we also get a lot of javadoc deduplication, but if yes is this an
>>>> acceptable user experience?
>>>> >
>>>> >
>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
>>>> [email protected]> wrote:
>>>> >>
>>>> >> I believe that the initial goal of unifying ReadAll as a general
>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the
>>>> amount of code duplication and error-prone approach related to this. It
>>>> makes much sense since usually we have all needed configuration set in Read
>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
>>>> Split-Shuffle-Read stages.  So this case usually can be unified by using
>>>> PCollection<Read> as input.
>>>> >>
>>>> >> On the other hand, we have another need to use Java IOs as
>>>> cross-language transforms (as Luke described) which seems only partly in
>>>> common with previous pattern of ReadAll using.
>>>> >>
>>>> >> I’d be more in favour to have only one concept of read configuration
>>>> for all needs but seems it’s not easy and I’d be more in favour with Luke
>>>> and Boyuan approach with schema. Though, maybe ReadAll is not a very
>>>> suitable name in this case because it will can bring some confusions
>>>> related to previous pattern of ReadAll uses.
>>>> >>
>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote:
>>>> >>
>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use
>>>> the data type that is schema-aware as the input of ReadAll.
>>>> >>
>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>> >>>
>>>> >>> Thanks for the summary, Cham!
>>>> >>>
>>>> >>> I think we can go with (2) and (4): use the data type that is
>>>> schema-aware as the input of ReadAll.
>>>> >>>
>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO.
>>>> But only having  (3) is not enough to solve the problem of using ReadAll in
>>>> x-lang case.
>>>> >>>
>>>> >>> The key point of ReadAll is that the input type of ReadAll should
>>>> be able to cross language boundaries and have compatibilities of
>>>> updating/downgrading. After investigating some possibilities(pure java pojo
>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that
>>>> row/schema fits our needs most. Here comes (4). I believe that using Read
>>>> as input of ReadAll makes sense in some cases, but I also think not all IOs
>>>> have the same need. I would treat Read as a special type as long as the
>>>> Read is schema-aware.
>>>> >>>
>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>> >>>>
>>>> >>>> I see. So it seems like there are three options discussed so far
>>>> when it comes to defining source descriptors for ReadAll type transforms
>>>> >>>>
>>>> >>>> (1) Use Read PTransform as the element type of the input
>>>> PCollection
>>>> >>>> (2) Use a POJO that describes the source as the data element of
>>>> the input PCollection
>>>> >>>> (3) Provide a converter as a function to the Read transform which
>>>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>>> >>>>
>>>> >>>> I feel like (3) is more suitable for a related set of source
>>>> descriptions such as files.
>>>> >>>> (1) will allow most code-reuse but seems like will make it hard to
>>>> use the ReadAll transform as a cross-language transform and will break the
>>>> separation of construction time and runtime constructs
>>>> >>>> (2) could result to less code reuse if not careful but will make
>>>> the transform easier to be used as a cross-language transform without
>>>> additional modifications
>>>> >>>>
>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are
>>>> more efficient. So we might be able to just define all sources in that
>>>> format and make Read transforms just an easy to use composite built on top
>>>> of that (by adding a preceding Create transform).
>>>> >>>>
>>>> >>>> Thanks,
>>>> >>>> Cham
>>>> >>>>
>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]>
>>>> wrote:
>>>> >>>>>
>>>> >>>>> I believe we do require PTransforms to be serializable since
>>>> anonymous DoFns typically capture the enclosing PTransform.
>>>> >>>>>
>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>> >>>>>>
>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at
>>>> least here:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>> >>>>>>
>>>> >>>>>> I'm in favour of separating construction time transforms from
>>>> execution time data objects that we store in PCollections as Luke
>>>> mentioned. Also, we don't guarantee that PTransform is serializable so
>>>> users have the additional complexity of providing a corder whenever a
>>>> PTransform is used as a data object.
>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are
>>>> convertible to Beam Rows allow us to make these transforms available to
>>>> other SDKs through the cross-language transforms. Using transforms or
>>>> complex sources as data objects will probably make this difficult.
>>>> >>>>>>
>>>> >>>>>> Thanks,
>>>> >>>>>> Cham
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <
>>>> [email protected]> wrote:
>>>> >>>>>>>
>>>> >>>>>>> Hi Ismael,
>>>> >>>>>>>
>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with
>>>> SDF implementation despite the type of input, where Read refers to
>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>> making source description across language boundaries is also necessary.  As
>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object
>>>> will be a SchemaCoder. When crossing language boundaries, it's also easy to
>>>> convert a Row into the source description: Convert.fromRows.
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]>
>>>> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform
>>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a
>>>> POJO that contains the configurable parameters for reading from Kafka. This
>>>> is different from the pattern that Ismael listed because they take
>>>> PCollection<Read> as input and the Read is the same as the Read PTransform
>>>> class used for the non read all case.
>>>> >>>>>>>>
>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since
>>>> parameters used to configure the transform have to be copied over to the
>>>> source descriptor but decouples how a transform is specified from the
>>>> object that describes what needs to be done. I believe Ismael's point is
>>>> that we wouldn't need such a decoupling.
>>>> >>>>>>>>
>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a
>>>> non-issue is that the Beam Java SDK has the most IO connectors and we would
>>>> want to use the IO implementations within Beam Go and Beam Python. This
>>>> brings in its own set of issues related to versioning and compatibility for
>>>> the wire format and how one parameterizes such transforms. The wire format
>>>> issue can be solved with either approach by making sure that the cross
>>>> language expansion always takes the well known format (whatever it may be)
>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then
>>>> passed to the ReadAll transform. Boyuan has been looking to make the
>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>>> this can be done easily using the AutoValue integration (I don't believe
>>>> there is anything preventing someone from writing a schema row -> Read ->
>>>> row adapter or also using the AutoValue configuration if the transform is
>>>> also an AutoValue).
>>>> >>>>>>>>
>>>> >>>>>>>> I would be more for the code duplication and separation of
>>>> concerns provided by using a different object to represent the contents of
>>>> the PCollection from the pipeline construction time PTransform.
>>>> >>>>>>>>
>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>> [email protected]> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> Hi Ismael,
>>>> >>>>>>>>>
>>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach
>>>> similar (or dual) to FileIO.write(), where we in a sense also have to
>>>> configure a dynamic number different IO transforms of the same type (file
>>>> writes)?
>>>> >>>>>>>>>
>>>> >>>>>>>>> E.g. how in this example we configure many aspects of many
>>>> file writes:
>>>> >>>>>>>>>
>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>> Transaction>writeDynamic()
>>>> >>>>>>>>>      .by(Transaction::getType)
>>>> >>>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the
>>>> data to be written to CSVSink
>>>> >>>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>> >>>>>>>>>      .to(".../path/to/")
>>>> >>>>>>>>>      .withNaming(type -> defaultNaming(type +
>>>> "-transactions", ".csv"));
>>>> >>>>>>>>>
>>>> >>>>>>>>> we could do something similar for many JdbcIO reads:
>>>> >>>>>>>>>
>>>> >>>>>>>>> PCollection<Bar> bars;  // user-specific type from which all
>>>> the read parameters can be inferred
>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>> >>>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>> >>>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>> >>>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>>> >>>>>>>>>   ...etc);
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <
>>>> [email protected]> wrote:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Hello,
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> (my excuses for the long email but this requires context)
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based
>>>> ones. One pattern
>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to
>>>> have a different
>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of
>>>> different sorts of
>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for
>>>> example:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> JdbcIO:
>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> RedisIO:
>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>> PCollection<KV<String, String>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> HBaseIO:
>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>> PCollection<Result>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple
>>>> queries in the same
>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from
>>>> multiple tables at the
>>>> >>>>>>>>>> same time but came with some maintenance issues:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the
>>>> parameters for
>>>> >>>>>>>>>>   missing information so we ended up with lots of duplicated
>>>> with methods and
>>>> >>>>>>>>>>   error-prone code from the Read transforms into the ReadAll
>>>> transforms.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> - When you require new parameters you have to expand the
>>>> input parameters of the
>>>> >>>>>>>>>>   intermediary specification into something that resembles
>>>> the full `Read`
>>>> >>>>>>>>>>   definition for example imagine you want to read from
>>>> multiple tables or
>>>> >>>>>>>>>>   servers as part of the same pipeline but this was not in
>>>> the intermediate
>>>> >>>>>>>>>>   specification you end up adding those extra methods
>>>> (duplicating more code)
>>>> >>>>>>>>>>   just o get close to the be like the Read full spec.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> - If new parameters are added to the Read method we end up
>>>> adding them
>>>> >>>>>>>>>>   systematically to the ReadAll transform too so they are
>>>> taken into account.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Due to these issues I recently did a change to test a new
>>>> approach that is
>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> HBaseIO:
>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>,
>>>> PCollection<Result>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> With this approach users gain benefits of improvements on
>>>> parameters of normal
>>>> >>>>>>>>>> Read because they count with the full Read parameters. But
>>>> of course there are
>>>> >>>>>>>>>> some minor caveats:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> 1. You need to push some information into normal Reads for
>>>> example
>>>> >>>>>>>>>>    partition boundaries information or Restriction
>>>> information (in the SDF
>>>> >>>>>>>>>>    case).  Notice that this consistent approach of ReadAll
>>>> produces a simple
>>>> >>>>>>>>>>    pattern that ends up being almost reusable between IOs
>>>> (e.g. the    non-SDF
>>>> >>>>>>>>>>    case):
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>   public static class ReadAll extends
>>>> PTransform<PCollection<Read>,
>>>> >>>>>>>>>> PCollection<SolrDocument>> {
>>>> >>>>>>>>>>     @Override
>>>> >>>>>>>>>>     public PCollection<SolrDocument>
>>>> expand(PCollection<Read> input) {
>>>> >>>>>>>>>>       return input
>>>> >>>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>> >>>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>> >>>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>> >>>>>>>>>>     }
>>>> >>>>>>>>>>   }
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll
>>>> you must have the
>>>> >>>>>>>>>>    Coders used in its definition and require consistent
>>>> types from the data
>>>> >>>>>>>>>>    sources, in practice this means we need to add extra
>>>> withCoder method(s) on
>>>> >>>>>>>>>>    ReadAll but not the full specs.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
>>>> pattern. RedisIO
>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted
>>>> to bring this subject
>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see any
>>>> sort of issues that
>>>> >>>>>>>>>> we might be missing with this idea.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Also I would like to see if we have consensus to start using
>>>> consistently the
>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the
>>>> readAll() method for new
>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only
>>>> remaining inconsistent
>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this
>>>> we should be ok).
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF
>>>> is doing something
>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and
>>>> maybe it is worth to be
>>>> >>>>>>>>>> consistent for the benefit of users.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Regards,
>>>> >>>>>>>>>> Ismaël
>>>> >>
>>>> >>
>>>>
>>>>

Reply via email to