For unbounded SDF in Kafka, we also consider the upgrading/downgrading compatibility in the pipeline update scenario(For detailed discussion, please refer to https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). In order to obtain the compatibility, it requires the input of the read SDF is schema-aware.
Thus the major constraint of mapping KafkaSourceDescriptor to PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware, otherwise pipeline updates might fail unnecessarily. If looking into KafkaIO.Read, not all necessary fields are compatible with schema, for example, SerializedFunction. I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern for SDF based IO. The Read can be a common pattern because the input is always a PBegin. But for an SDF based IO, the input can be anything. By using Read as input, we will still have the maintenance cost when SDF IO supports a new field but Read doesn't consume it. For example, we are discussing adding endOffset and endReadTime to KafkaSourceDescriptior, which is not used in KafkaIO.Read. On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <ieme...@gmail.com> wrote: > We forgot to mention (5) External.Config used in cross-lang, see KafkaIO > ExternalTransformBuilder. This approach is the predecessor of (4) and > probably a > really good candidate to be replaced by the Row based Configuration Boyuan > is > envisioning (so good to be aware of this). > > Thanks for the clear explanation Luke you mention the real issue(s). All > the > approaches discussed so far in the end could be easily transformed to > produce a > PCollection<Read> and those Read Elements could be read by the generic > ReadAll > transform. Notice that this can be internal in some IOs e.g. KafkaIO if > they > decide not to expose it. I am not saying that we should force every IO to > support ReadAll in its public API but if we do it is probably a good idea > to be > consistent with naming the transform that expects an input > PCollection<Read> in > the same way. Also notice that using it will save us of the maintenance > issues > discussed in my previous email. > > Back to the main concern: the consequences of expansion based on Read: So > far I > have not seen consequences for the Splitting part which maps really nice > assuming the Partition info / Restriction is available as part of Read. So > far > there are not Serialization because Beam is already enforcing this. Notice > that > ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least for the > Bounded case (see the code in my previous email). For the other points: > > > a) Do all properties set on a Read apply to the ReadAll? For example, the > > Kafka Read implementation allows you to set the key and value > deserializers > > which are also used to dictate the output PCollection type. It also > allows you > > to set how the watermark should be computed. Technically a user may want > the > > watermark computation to be configurable per Read and they may also want > an > > output type which is polymorphic (e.g. Pcollection<Serializable>). > > Most of the times they do but for parametric types we cannot support > different > types in the outputs of the Read or at least I did not find how to do so > (is > there a way to use multiple output Coders on Beam?), we saw this in > CassandraIO > and we were discussing adding explicitly these Coders or Serializer > specific methods to the ReadAll transform. This is less nice because it > will > imply some repeated methods, but it is still a compromise to gain the other > advantages. I suppose the watermark case you mention is similar because > you may > want the watermark to behave differently in each Read and we probably don’t > support this, so it corresponds to the polymorphic category. > > > b) Read extends PTransform which brings its own object modelling > concerns. > > > During the implementations of ReadAll(PCollection<Read>), was it > discovered > > that some properties became runtime errors or were ignored if they were > set? > > If no, then the code deduplication is likely worth it because we also > get a > > lot of javadoc deduplication, but if yes is this an acceptable user > > experience? > > No, not so far. This is an interesting part, notice that the Read > translation > ends up delegating the read bits to the ReadFn part of ReadAll so the > ReadFn is > the real read and must be aware and use all the parameters. > > @Override > public PCollection<SolrDocument> expand(PBegin input) { > return input.apply("Create", Create.of(this)).apply("ReadAll", > readAll()); > } > > I might be missing something for the Unbounded SDF case which is the only > case > we have not explored so far. I think one easy way to see the limitations > would > be in the ongoing KafkaIO SDF based implementation to try to map > KafkaSourceDescriptor to do the extra PCollection<Read> and the Read logic > on > the ReadAll with the SDF to see which constraints we hit, the polymorphic > ones > will be there for sure, maybe others will appear (not sure). However it > would be > interesting to see if we have a real gain in the maintenance points, but > well > let’s not forget also that KafkaIO has a LOT of knobs so probably the > generic > implementation could be relatively complex. > > > > On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote: > > > > I had mentioned that approach 1 and approach 2 work for cross language. > The difference being that the cross language transform would take a well > known definition and convert it to the Read transform. A normal user would > have a pipeline that would look like: > > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> > > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> > PCollection<Output> > > > > And in the cross language case this would look like: > > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to > Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> > > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to > SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) > -> PCollection<Output>* > > * note that PTransform(Convert Row to SourceDescriptor) only exists > since we haven't solved how to use schemas with language bound types in a > cross language way. SchemaCoder isn't portable but RowCoder is which is why > the conversion step exists. We could have a solution for this at some point > in time. > > > > My concern with using Read was around: > > a) Do all properties set on a Read apply to the ReadAll? For example, > the Kafka Read implementation allows you to set the key and value > deserializers which are also used to dictate the output PCollection type. > It also allows you to set how the watermark should be computed. Technically > a user may want the watermark computation to be configurable per Read and > they may also want an output type which is polymorphic (e.g. > PCollection<Serializable>). > > b) Read extends PTransform which brings its own object modelling > concerns. > > > > During the implementations of ReadAll(PCollection<Read>), was it > discovered that some properties became runtime errors or were ignored if > they were set? If no, then the code deduplication is likely worth it > because we also get a lot of javadoc deduplication, but if yes is this an > acceptable user experience? > > > > > > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < > aromanenko....@gmail.com> wrote: > >> > >> I believe that the initial goal of unifying ReadAll as a general > "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the > amount of code duplication and error-prone approach related to this. It > makes much sense since usually we have all needed configuration set in Read > objects and, as Ismaeil mentioned, ReadAll will consist mostly of only > Split-Shuffle-Read stages. So this case usually can be unified by using > PCollection<Read> as input. > >> > >> On the other hand, we have another need to use Java IOs as > cross-language transforms (as Luke described) which seems only partly in > common with previous pattern of ReadAll using. > >> > >> I’d be more in favour to have only one concept of read configuration > for all needs but seems it’s not easy and I’d be more in favour with Luke > and Boyuan approach with schema. Though, maybe ReadAll is not a very > suitable name in this case because it will can bring some confusions > related to previous pattern of ReadAll uses. > >> > >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote: > >> > >> Sorry for the typo. I mean I think we can go with (3) and (4): use the > data type that is schema-aware as the input of ReadAll. > >> > >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com> > wrote: > >>> > >>> Thanks for the summary, Cham! > >>> > >>> I think we can go with (2) and (4): use the data type that is > schema-aware as the input of ReadAll. > >>> > >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But > only having (3) is not enough to solve the problem of using ReadAll in > x-lang case. > >>> > >>> The key point of ReadAll is that the input type of ReadAll should be > able to cross language boundaries and have compatibilities of > updating/downgrading. After investigating some possibilities(pure java pojo > with custom coder, protobuf, row/schema) in Kafka usage, we find that > row/schema fits our needs most. Here comes (4). I believe that using Read > as input of ReadAll makes sense in some cases, but I also think not all IOs > have the same need. I would treat Read as a special type as long as the > Read is schema-aware. > >>> > >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < > chamik...@google.com> wrote: > >>>> > >>>> I see. So it seems like there are three options discussed so far when > it comes to defining source descriptors for ReadAll type transforms > >>>> > >>>> (1) Use Read PTransform as the element type of the input PCollection > >>>> (2) Use a POJO that describes the source as the data element of the > input PCollection > >>>> (3) Provide a converter as a function to the Read transform which > essentially will convert it to a ReadAll (what Eugene mentioned) > >>>> > >>>> I feel like (3) is more suitable for a related set of source > descriptions such as files. > >>>> (1) will allow most code-reuse but seems like will make it hard to > use the ReadAll transform as a cross-language transform and will break the > separation of construction time and runtime constructs > >>>> (2) could result to less code reuse if not careful but will make the > transform easier to be used as a cross-language transform without > additional modifications > >>>> > >>>> Also, with SDF, we can create ReadAll-like transforms that are more > efficient. So we might be able to just define all sources in that format > and make Read transforms just an easy to use composite built on top of that > (by adding a preceding Create transform). > >>>> > >>>> Thanks, > >>>> Cham > >>>> > >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote: > >>>>> > >>>>> I believe we do require PTransforms to be serializable since > anonymous DoFns typically capture the enclosing PTransform. > >>>>> > >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < > chamik...@google.com> wrote: > >>>>>> > >>>>>> Seems like Read in PCollection<Read> refers to a transform, at > least here: > https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 > >>>>>> > >>>>>> I'm in favour of separating construction time transforms from > execution time data objects that we store in PCollections as Luke > mentioned. Also, we don't guarantee that PTransform is serializable so > users have the additional complexity of providing a corder whenever a > PTransform is used as a data object. > >>>>>> Also, agree with Boyuan that using simple Java objects that are > convertible to Beam Rows allow us to make these transforms available to > other SDKs through the cross-language transforms. Using transforms or > complex sources as data objects will probably make this difficult. > >>>>>> > >>>>>> Thanks, > >>>>>> Cham > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com> > wrote: > >>>>>>> > >>>>>>> Hi Ismael, > >>>>>>> > >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF > implementation despite the type of input, where Read refers to > UnboundedSource. One major pushback of using KafkaIO.Read as source > description is that not all configurations of KafkaIO.Read are meaningful > to populate during execution time. Also when thinking about x-lang useage, > making source description across language boundaries is also necessary. As > Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: > KafkaSourceDescription.java. Then the coder of this schema-aware object > will be a SchemaCoder. When crossing language boundaries, it's also easy to > convert a Row into the source description: Convert.fromRows. > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com> > wrote: > >>>>>>>> > >>>>>>>> To provide additional context, the KafkaIO ReadAll transform > takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a > POJO that contains the configurable parameters for reading from Kafka. This > is different from the pattern that Ismael listed because they take > PCollection<Read> as input and the Read is the same as the Read PTransform > class used for the non read all case. > >>>>>>>> > >>>>>>>> The KafkaSourceDescriptor does lead to duplication since > parameters used to configure the transform have to be copied over to the > source descriptor but decouples how a transform is specified from the > object that describes what needs to be done. I believe Ismael's point is > that we wouldn't need such a decoupling. > >>>>>>>> > >>>>>>>> Another area that hasn't been discussed and I believe is a > non-issue is that the Beam Java SDK has the most IO connectors and we would > want to use the IO implementations within Beam Go and Beam Python. This > brings in its own set of issues related to versioning and compatibility for > the wire format and how one parameterizes such transforms. The wire format > issue can be solved with either approach by making sure that the cross > language expansion always takes the well known format (whatever it may be) > and converts it into Read/KafkaSourceDescriptor/... object that is then > passed to the ReadAll transform. Boyuan has been looking to make the > KafkaSourceDescriptor have a schema so it can be represented as a row and > this can be done easily using the AutoValue integration (I don't believe > there is anything preventing someone from writing a schema row -> Read -> > row adapter or also using the AutoValue configuration if the transform is > also an AutoValue). > >>>>>>>> > >>>>>>>> I would be more for the code duplication and separation of > concerns provided by using a different object to represent the contents of > the PCollection from the pipeline construction time PTransform. > >>>>>>>> > >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < > kirpic...@google.com> wrote: > >>>>>>>>> > >>>>>>>>> Hi Ismael, > >>>>>>>>> > >>>>>>>>> Thanks for taking this on. Have you considered an approach > similar (or dual) to FileIO.write(), where we in a sense also have to > configure a dynamic number different IO transforms of the same type (file > writes)? > >>>>>>>>> > >>>>>>>>> E.g. how in this example we configure many aspects of many file > writes: > >>>>>>>>> > >>>>>>>>> transactions.apply(FileIO.<TransactionType, > Transaction>writeDynamic() > >>>>>>>>> .by(Transaction::getType) > >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the data > to be written to CSVSink > >>>>>>>>> type -> new CSVSink(type.getFieldNames())) > >>>>>>>>> .to(".../path/to/") > >>>>>>>>> .withNaming(type -> defaultNaming(type + "-transactions", > ".csv")); > >>>>>>>>> > >>>>>>>>> we could do something similar for many JdbcIO reads: > >>>>>>>>> > >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all the > read parameters can be inferred > >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() > >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) > >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) > >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) > >>>>>>>>> ...etc); > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com> > wrote: > >>>>>>>>>> > >>>>>>>>>> Hello, > >>>>>>>>>> > >>>>>>>>>> (my excuses for the long email but this requires context) > >>>>>>>>>> > >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones. > One pattern > >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to > have a different > >>>>>>>>>> kind of composable reads where we take a PCollection of > different sorts of > >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for > example: > >>>>>>>>>> > >>>>>>>>>> JdbcIO: > >>>>>>>>>> ReadAll<ParameterT, OutputT> extends > >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> > >>>>>>>>>> > >>>>>>>>>> RedisIO: > >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, > PCollection<KV<String, String>>> > >>>>>>>>>> > >>>>>>>>>> HBaseIO: > >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, > PCollection<Result>> > >>>>>>>>>> > >>>>>>>>>> These patterns enabled richer use cases like doing multiple > queries in the same > >>>>>>>>>> Pipeline, querying based on key patterns or querying from > multiple tables at the > >>>>>>>>>> same time but came with some maintenance issues: > >>>>>>>>>> > >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the > parameters for > >>>>>>>>>> missing information so we ended up with lots of duplicated > with methods and > >>>>>>>>>> error-prone code from the Read transforms into the ReadAll > transforms. > >>>>>>>>>> > >>>>>>>>>> - When you require new parameters you have to expand the input > parameters of the > >>>>>>>>>> intermediary specification into something that resembles the > full `Read` > >>>>>>>>>> definition for example imagine you want to read from multiple > tables or > >>>>>>>>>> servers as part of the same pipeline but this was not in the > intermediate > >>>>>>>>>> specification you end up adding those extra methods > (duplicating more code) > >>>>>>>>>> just o get close to the be like the Read full spec. > >>>>>>>>>> > >>>>>>>>>> - If new parameters are added to the Read method we end up > adding them > >>>>>>>>>> systematically to the ReadAll transform too so they are taken > into account. > >>>>>>>>>> > >>>>>>>>>> Due to these issues I recently did a change to test a new > approach that is > >>>>>>>>>> simpler, more complete and maintainable. The code became: > >>>>>>>>>> > >>>>>>>>>> HBaseIO: > >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, > PCollection<Result>> > >>>>>>>>>> > >>>>>>>>>> With this approach users gain benefits of improvements on > parameters of normal > >>>>>>>>>> Read because they count with the full Read parameters. But of > course there are > >>>>>>>>>> some minor caveats: > >>>>>>>>>> > >>>>>>>>>> 1. You need to push some information into normal Reads for > example > >>>>>>>>>> partition boundaries information or Restriction information > (in the SDF > >>>>>>>>>> case). Notice that this consistent approach of ReadAll > produces a simple > >>>>>>>>>> pattern that ends up being almost reusable between IOs (e.g. > the non-SDF > >>>>>>>>>> case): > >>>>>>>>>> > >>>>>>>>>> public static class ReadAll extends > PTransform<PCollection<Read>, > >>>>>>>>>> PCollection<SolrDocument>> { > >>>>>>>>>> @Override > >>>>>>>>>> public PCollection<SolrDocument> expand(PCollection<Read> > input) { > >>>>>>>>>> return input > >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) > >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) > >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); > >>>>>>>>>> } > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you > must have the > >>>>>>>>>> Coders used in its definition and require consistent types > from the data > >>>>>>>>>> sources, in practice this means we need to add extra > withCoder method(s) on > >>>>>>>>>> ReadAll but not the full specs. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll > pattern. RedisIO > >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to > bring this subject > >>>>>>>>>> to the mailing list to see your opinions, and if you see any > sort of issues that > >>>>>>>>>> we might be missing with this idea. > >>>>>>>>>> > >>>>>>>>>> Also I would like to see if we have consensus to start using > consistently the > >>>>>>>>>> terminology of ReadAll transforms based on Read and the > readAll() method for new > >>>>>>>>>> IOs (at this point probably outdoing this in the only remaining > inconsistent > >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we > should be ok). > >>>>>>>>>> > >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is > doing something > >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe > it is worth to be > >>>>>>>>>> consistent for the benefit of users. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Ismaël > >> > >> > > On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote: > > > > I had mentioned that approach 1 and approach 2 work for cross language. > The difference being that the cross language transform would take a well > known definition and convert it to the Read transform. A normal user would > have a pipeline that would look like: > > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> > > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> > PCollection<Output> > > > > And in the cross language case this would look like: > > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to > Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> > > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to > SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) > -> PCollection<Output>* > > * note that PTransform(Convert Row to SourceDescriptor) only exists > since we haven't solved how to use schemas with language bound types in a > cross language way. SchemaCoder isn't portable but RowCoder is which is why > the conversion step exists. We could have a solution for this at some point > in time. > > > > My concern with using Read was around: > > a) Do all properties set on a Read apply to the ReadAll? For example, > the Kafka Read implementation allows you to set the key and value > deserializers which are also used to dictate the output PCollection type. > It also allows you to set how the watermark should be computed. Technically > a user may want the watermark computation to be configurable per Read and > they may also want an output type which is polymorphic (e.g. > PCollection<Serializable>). > > b) Read extends PTransform which brings its own object modelling > concerns. > > > > During the implementations of ReadAll(PCollection<Read>), was it > discovered that some properties became runtime errors or were ignored if > they were set? If no, then the code deduplication is likely worth it > because we also get a lot of javadoc deduplication, but if yes is this an > acceptable user experience? > > > > > > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < > aromanenko....@gmail.com> wrote: > >> > >> I believe that the initial goal of unifying ReadAll as a general > "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the > amount of code duplication and error-prone approach related to this. It > makes much sense since usually we have all needed configuration set in Read > objects and, as Ismaeil mentioned, ReadAll will consist mostly of only > Split-Shuffle-Read stages. So this case usually can be unified by using > PCollection<Read> as input. > >> > >> On the other hand, we have another need to use Java IOs as > cross-language transforms (as Luke described) which seems only partly in > common with previous pattern of ReadAll using. > >> > >> I’d be more in favour to have only one concept of read configuration > for all needs but seems it’s not easy and I’d be more in favour with Luke > and Boyuan approach with schema. Though, maybe ReadAll is not a very > suitable name in this case because it will can bring some confusions > related to previous pattern of ReadAll uses. > >> > >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote: > >> > >> Sorry for the typo. I mean I think we can go with (3) and (4): use the > data type that is schema-aware as the input of ReadAll. > >> > >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com> > wrote: > >>> > >>> Thanks for the summary, Cham! > >>> > >>> I think we can go with (2) and (4): use the data type that is > schema-aware as the input of ReadAll. > >>> > >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But > only having (3) is not enough to solve the problem of using ReadAll in > x-lang case. > >>> > >>> The key point of ReadAll is that the input type of ReadAll should be > able to cross language boundaries and have compatibilities of > updating/downgrading. After investigating some possibilities(pure java pojo > with custom coder, protobuf, row/schema) in Kafka usage, we find that > row/schema fits our needs most. Here comes (4). I believe that using Read > as input of ReadAll makes sense in some cases, but I also think not all IOs > have the same need. I would treat Read as a special type as long as the > Read is schema-aware. > >>> > >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < > chamik...@google.com> wrote: > >>>> > >>>> I see. So it seems like there are three options discussed so far when > it comes to defining source descriptors for ReadAll type transforms > >>>> > >>>> (1) Use Read PTransform as the element type of the input PCollection > >>>> (2) Use a POJO that describes the source as the data element of the > input PCollection > >>>> (3) Provide a converter as a function to the Read transform which > essentially will convert it to a ReadAll (what Eugene mentioned) > >>>> > >>>> I feel like (3) is more suitable for a related set of source > descriptions such as files. > >>>> (1) will allow most code-reuse but seems like will make it hard to > use the ReadAll transform as a cross-language transform and will break the > separation of construction time and runtime constructs > >>>> (2) could result to less code reuse if not careful but will make the > transform easier to be used as a cross-language transform without > additional modifications > >>>> > >>>> Also, with SDF, we can create ReadAll-like transforms that are more > efficient. So we might be able to just define all sources in that format > and make Read transforms just an easy to use composite built on top of that > (by adding a preceding Create transform). > >>>> > >>>> Thanks, > >>>> Cham > >>>> > >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote: > >>>>> > >>>>> I believe we do require PTransforms to be serializable since > anonymous DoFns typically capture the enclosing PTransform. > >>>>> > >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < > chamik...@google.com> wrote: > >>>>>> > >>>>>> Seems like Read in PCollection<Read> refers to a transform, at > least here: > https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 > >>>>>> > >>>>>> I'm in favour of separating construction time transforms from > execution time data objects that we store in PCollections as Luke > mentioned. Also, we don't guarantee that PTransform is serializable so > users have the additional complexity of providing a corder whenever a > PTransform is used as a data object. > >>>>>> Also, agree with Boyuan that using simple Java objects that are > convertible to Beam Rows allow us to make these transforms available to > other SDKs through the cross-language transforms. Using transforms or > complex sources as data objects will probably make this difficult. > >>>>>> > >>>>>> Thanks, > >>>>>> Cham > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com> > wrote: > >>>>>>> > >>>>>>> Hi Ismael, > >>>>>>> > >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF > implementation despite the type of input, where Read refers to > UnboundedSource. One major pushback of using KafkaIO.Read as source > description is that not all configurations of KafkaIO.Read are meaningful > to populate during execution time. Also when thinking about x-lang useage, > making source description across language boundaries is also necessary. As > Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: > KafkaSourceDescription.java. Then the coder of this schema-aware object > will be a SchemaCoder. When crossing language boundaries, it's also easy to > convert a Row into the source description: Convert.fromRows. > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com> > wrote: > >>>>>>>> > >>>>>>>> To provide additional context, the KafkaIO ReadAll transform > takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a > POJO that contains the configurable parameters for reading from Kafka. This > is different from the pattern that Ismael listed because they take > PCollection<Read> as input and the Read is the same as the Read PTransform > class used for the non read all case. > >>>>>>>> > >>>>>>>> The KafkaSourceDescriptor does lead to duplication since > parameters used to configure the transform have to be copied over to the > source descriptor but decouples how a transform is specified from the > object that describes what needs to be done. I believe Ismael's point is > that we wouldn't need such a decoupling. > >>>>>>>> > >>>>>>>> Another area that hasn't been discussed and I believe is a > non-issue is that the Beam Java SDK has the most IO connectors and we would > want to use the IO implementations within Beam Go and Beam Python. This > brings in its own set of issues related to versioning and compatibility for > the wire format and how one parameterizes such transforms. The wire format > issue can be solved with either approach by making sure that the cross > language expansion always takes the well known format (whatever it may be) > and converts it into Read/KafkaSourceDescriptor/... object that is then > passed to the ReadAll transform. Boyuan has been looking to make the > KafkaSourceDescriptor have a schema so it can be represented as a row and > this can be done easily using the AutoValue integration (I don't believe > there is anything preventing someone from writing a schema row -> Read -> > row adapter or also using the AutoValue configuration if the transform is > also an AutoValue). > >>>>>>>> > >>>>>>>> I would be more for the code duplication and separation of > concerns provided by using a different object to represent the contents of > the PCollection from the pipeline construction time PTransform. > >>>>>>>> > >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < > kirpic...@google.com> wrote: > >>>>>>>>> > >>>>>>>>> Hi Ismael, > >>>>>>>>> > >>>>>>>>> Thanks for taking this on. Have you considered an approach > similar (or dual) to FileIO.write(), where we in a sense also have to > configure a dynamic number different IO transforms of the same type (file > writes)? > >>>>>>>>> > >>>>>>>>> E.g. how in this example we configure many aspects of many file > writes: > >>>>>>>>> > >>>>>>>>> transactions.apply(FileIO.<TransactionType, > Transaction>writeDynamic() > >>>>>>>>> .by(Transaction::getType) > >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the data > to be written to CSVSink > >>>>>>>>> type -> new CSVSink(type.getFieldNames())) > >>>>>>>>> .to(".../path/to/") > >>>>>>>>> .withNaming(type -> defaultNaming(type + "-transactions", > ".csv")); > >>>>>>>>> > >>>>>>>>> we could do something similar for many JdbcIO reads: > >>>>>>>>> > >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all the > read parameters can be inferred > >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() > >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) > >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) > >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) > >>>>>>>>> ...etc); > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com> > wrote: > >>>>>>>>>> > >>>>>>>>>> Hello, > >>>>>>>>>> > >>>>>>>>>> (my excuses for the long email but this requires context) > >>>>>>>>>> > >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones. > One pattern > >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to > have a different > >>>>>>>>>> kind of composable reads where we take a PCollection of > different sorts of > >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for > example: > >>>>>>>>>> > >>>>>>>>>> JdbcIO: > >>>>>>>>>> ReadAll<ParameterT, OutputT> extends > >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> > >>>>>>>>>> > >>>>>>>>>> RedisIO: > >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, > PCollection<KV<String, String>>> > >>>>>>>>>> > >>>>>>>>>> HBaseIO: > >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, > PCollection<Result>> > >>>>>>>>>> > >>>>>>>>>> These patterns enabled richer use cases like doing multiple > queries in the same > >>>>>>>>>> Pipeline, querying based on key patterns or querying from > multiple tables at the > >>>>>>>>>> same time but came with some maintenance issues: > >>>>>>>>>> > >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the > parameters for > >>>>>>>>>> missing information so we ended up with lots of duplicated > with methods and > >>>>>>>>>> error-prone code from the Read transforms into the ReadAll > transforms. > >>>>>>>>>> > >>>>>>>>>> - When you require new parameters you have to expand the input > parameters of the > >>>>>>>>>> intermediary specification into something that resembles the > full `Read` > >>>>>>>>>> definition for example imagine you want to read from multiple > tables or > >>>>>>>>>> servers as part of the same pipeline but this was not in the > intermediate > >>>>>>>>>> specification you end up adding those extra methods > (duplicating more code) > >>>>>>>>>> just o get close to the be like the Read full spec. > >>>>>>>>>> > >>>>>>>>>> - If new parameters are added to the Read method we end up > adding them > >>>>>>>>>> systematically to the ReadAll transform too so they are taken > into account. > >>>>>>>>>> > >>>>>>>>>> Due to these issues I recently did a change to test a new > approach that is > >>>>>>>>>> simpler, more complete and maintainable. The code became: > >>>>>>>>>> > >>>>>>>>>> HBaseIO: > >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, > PCollection<Result>> > >>>>>>>>>> > >>>>>>>>>> With this approach users gain benefits of improvements on > parameters of normal > >>>>>>>>>> Read because they count with the full Read parameters. But of > course there are > >>>>>>>>>> some minor caveats: > >>>>>>>>>> > >>>>>>>>>> 1. You need to push some information into normal Reads for > example > >>>>>>>>>> partition boundaries information or Restriction information > (in the SDF > >>>>>>>>>> case). Notice that this consistent approach of ReadAll > produces a simple > >>>>>>>>>> pattern that ends up being almost reusable between IOs (e.g. > the non-SDF > >>>>>>>>>> case): > >>>>>>>>>> > >>>>>>>>>> public static class ReadAll extends > PTransform<PCollection<Read>, > >>>>>>>>>> PCollection<SolrDocument>> { > >>>>>>>>>> @Override > >>>>>>>>>> public PCollection<SolrDocument> expand(PCollection<Read> > input) { > >>>>>>>>>> return input > >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) > >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) > >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); > >>>>>>>>>> } > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you > must have the > >>>>>>>>>> Coders used in its definition and require consistent types > from the data > >>>>>>>>>> sources, in practice this means we need to add extra > withCoder method(s) on > >>>>>>>>>> ReadAll but not the full specs. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll > pattern. RedisIO > >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to > bring this subject > >>>>>>>>>> to the mailing list to see your opinions, and if you see any > sort of issues that > >>>>>>>>>> we might be missing with this idea. > >>>>>>>>>> > >>>>>>>>>> Also I would like to see if we have consensus to start using > consistently the > >>>>>>>>>> terminology of ReadAll transforms based on Read and the > readAll() method for new > >>>>>>>>>> IOs (at this point probably outdoing this in the only remaining > inconsistent > >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we > should be ok). > >>>>>>>>>> > >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is > doing something > >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe > it is worth to be > >>>>>>>>>> consistent for the benefit of users. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Ismaël > >> > >> > >