Discussion regarding cross-language transforms is a slight tangent here. But I think, in general, it's great if we can use existing transforms (for example, IO connectors) as cross-language transforms without having to build more composites (irrespective of whether in ExternalTransformBuilders or a user pipelines) just to make them cross-language compatible. A future cross-language compatible SchemaCoder might help (assuming that works for Read transform) but I'm not sure we have a good idea when we'll get to that state.
Thanks, Cham On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <boyu...@google.com> wrote: > For unbounded SDF in Kafka, we also consider the upgrading/downgrading > compatibility in the pipeline update scenario(For detailed discussion, > please refer to > https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). > In order to obtain the compatibility, it requires the input of the read SDF > is schema-aware. > > Thus the major constraint of mapping KafkaSourceDescriptor to > PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware, > otherwise pipeline updates might fail unnecessarily. If looking into > KafkaIO.Read, not all necessary fields are compatible with schema, for > example, SerializedFunction. > > I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern for > SDF based IO. The Read can be a common pattern because the input is always > a PBegin. But for an SDF based IO, the input can be anything. By using Read > as input, we will still have the maintenance cost when SDF IO supports a > new field but Read doesn't consume it. For example, we are discussing > adding endOffset and endReadTime to KafkaSourceDescriptior, which is not > used in KafkaIO.Read. > > On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <ieme...@gmail.com> wrote: > >> We forgot to mention (5) External.Config used in cross-lang, see KafkaIO >> ExternalTransformBuilder. This approach is the predecessor of (4) and >> probably a >> really good candidate to be replaced by the Row based Configuration >> Boyuan is >> envisioning (so good to be aware of this). >> >> Thanks for the clear explanation Luke you mention the real issue(s). All >> the >> approaches discussed so far in the end could be easily transformed to >> produce a >> PCollection<Read> and those Read Elements could be read by the generic >> ReadAll >> transform. Notice that this can be internal in some IOs e.g. KafkaIO if >> they >> decide not to expose it. I am not saying that we should force every IO to >> support ReadAll in its public API but if we do it is probably a good idea >> to be >> consistent with naming the transform that expects an input >> PCollection<Read> in >> the same way. Also notice that using it will save us of the maintenance >> issues >> discussed in my previous email. >> >> Back to the main concern: the consequences of expansion based on Read: So >> far I >> have not seen consequences for the Splitting part which maps really nice >> assuming the Partition info / Restriction is available as part of Read. >> So far >> there are not Serialization because Beam is already enforcing this. >> Notice that >> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least for >> the >> Bounded case (see the code in my previous email). For the other points: >> >> > a) Do all properties set on a Read apply to the ReadAll? For example, >> the >> > Kafka Read implementation allows you to set the key and value >> deserializers >> > which are also used to dictate the output PCollection type. It also >> allows you >> > to set how the watermark should be computed. Technically a user may >> want the >> > watermark computation to be configurable per Read and they may also >> want an >> > output type which is polymorphic (e.g. Pcollection<Serializable>). >> >> Most of the times they do but for parametric types we cannot support >> different >> types in the outputs of the Read or at least I did not find how to do so >> (is >> there a way to use multiple output Coders on Beam?), we saw this in >> CassandraIO >> and we were discussing adding explicitly these Coders or Serializer >> specific methods to the ReadAll transform. This is less nice because it >> will >> imply some repeated methods, but it is still a compromise to gain the >> other >> advantages. I suppose the watermark case you mention is similar because >> you may >> want the watermark to behave differently in each Read and we probably >> don’t >> support this, so it corresponds to the polymorphic category. >> >> > b) Read extends PTransform which brings its own object modelling >> concerns. >> >> > During the implementations of ReadAll(PCollection<Read>), was it >> discovered >> > that some properties became runtime errors or were ignored if they were >> set? >> > If no, then the code deduplication is likely worth it because we also >> get a >> > lot of javadoc deduplication, but if yes is this an acceptable user >> > experience? >> >> No, not so far. This is an interesting part, notice that the Read >> translation >> ends up delegating the read bits to the ReadFn part of ReadAll so the >> ReadFn is >> the real read and must be aware and use all the parameters. >> >> @Override >> public PCollection<SolrDocument> expand(PBegin input) { >> return input.apply("Create", Create.of(this)).apply("ReadAll", >> readAll()); >> } >> >> I might be missing something for the Unbounded SDF case which is the only >> case >> we have not explored so far. I think one easy way to see the limitations >> would >> be in the ongoing KafkaIO SDF based implementation to try to map >> KafkaSourceDescriptor to do the extra PCollection<Read> and the Read >> logic on >> the ReadAll with the SDF to see which constraints we hit, the polymorphic >> ones >> will be there for sure, maybe others will appear (not sure). However it >> would be >> interesting to see if we have a real gain in the maintenance points, but >> well >> let’s not forget also that KafkaIO has a LOT of knobs so probably the >> generic >> implementation could be relatively complex. >> >> >> >> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote: >> > >> > I had mentioned that approach 1 and approach 2 work for cross language. >> The difference being that the cross language transform would take a well >> known definition and convert it to the Read transform. A normal user would >> have a pipeline that would look like: >> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >> PCollection<Output> >> > >> > And in the cross language case this would look like: >> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) >> -> PCollection<Output>* >> > * note that PTransform(Convert Row to SourceDescriptor) only exists >> since we haven't solved how to use schemas with language bound types in a >> cross language way. SchemaCoder isn't portable but RowCoder is which is why >> the conversion step exists. We could have a solution for this at some point >> in time. >> > >> > My concern with using Read was around: >> > a) Do all properties set on a Read apply to the ReadAll? For example, >> the Kafka Read implementation allows you to set the key and value >> deserializers which are also used to dictate the output PCollection type. >> It also allows you to set how the watermark should be computed. Technically >> a user may want the watermark computation to be configurable per Read and >> they may also want an output type which is polymorphic (e.g. >> PCollection<Serializable>). >> > b) Read extends PTransform which brings its own object modelling >> concerns. >> > >> > During the implementations of ReadAll(PCollection<Read>), was it >> discovered that some properties became runtime errors or were ignored if >> they were set? If no, then the code deduplication is likely worth it >> because we also get a lot of javadoc deduplication, but if yes is this an >> acceptable user experience? >> > >> > >> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >> aromanenko....@gmail.com> wrote: >> >> >> >> I believe that the initial goal of unifying ReadAll as a general >> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the >> amount of code duplication and error-prone approach related to this. It >> makes much sense since usually we have all needed configuration set in Read >> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >> Split-Shuffle-Read stages. So this case usually can be unified by using >> PCollection<Read> as input. >> >> >> >> On the other hand, we have another need to use Java IOs as >> cross-language transforms (as Luke described) which seems only partly in >> common with previous pattern of ReadAll using. >> >> >> >> I’d be more in favour to have only one concept of read configuration >> for all needs but seems it’s not easy and I’d be more in favour with Luke >> and Boyuan approach with schema. Though, maybe ReadAll is not a very >> suitable name in this case because it will can bring some confusions >> related to previous pattern of ReadAll uses. >> >> >> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote: >> >> >> >> Sorry for the typo. I mean I think we can go with (3) and (4): use the >> data type that is schema-aware as the input of ReadAll. >> >> >> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com> >> wrote: >> >>> >> >>> Thanks for the summary, Cham! >> >>> >> >>> I think we can go with (2) and (4): use the data type that is >> schema-aware as the input of ReadAll. >> >>> >> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But >> only having (3) is not enough to solve the problem of using ReadAll in >> x-lang case. >> >>> >> >>> The key point of ReadAll is that the input type of ReadAll should be >> able to cross language boundaries and have compatibilities of >> updating/downgrading. After investigating some possibilities(pure java pojo >> with custom coder, protobuf, row/schema) in Kafka usage, we find that >> row/schema fits our needs most. Here comes (4). I believe that using Read >> as input of ReadAll makes sense in some cases, but I also think not all IOs >> have the same need. I would treat Read as a special type as long as the >> Read is schema-aware. >> >>> >> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >> chamik...@google.com> wrote: >> >>>> >> >>>> I see. So it seems like there are three options discussed so far >> when it comes to defining source descriptors for ReadAll type transforms >> >>>> >> >>>> (1) Use Read PTransform as the element type of the input PCollection >> >>>> (2) Use a POJO that describes the source as the data element of the >> input PCollection >> >>>> (3) Provide a converter as a function to the Read transform which >> essentially will convert it to a ReadAll (what Eugene mentioned) >> >>>> >> >>>> I feel like (3) is more suitable for a related set of source >> descriptions such as files. >> >>>> (1) will allow most code-reuse but seems like will make it hard to >> use the ReadAll transform as a cross-language transform and will break the >> separation of construction time and runtime constructs >> >>>> (2) could result to less code reuse if not careful but will make the >> transform easier to be used as a cross-language transform without >> additional modifications >> >>>> >> >>>> Also, with SDF, we can create ReadAll-like transforms that are more >> efficient. So we might be able to just define all sources in that format >> and make Read transforms just an easy to use composite built on top of that >> (by adding a preceding Create transform). >> >>>> >> >>>> Thanks, >> >>>> Cham >> >>>> >> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote: >> >>>>> >> >>>>> I believe we do require PTransforms to be serializable since >> anonymous DoFns typically capture the enclosing PTransform. >> >>>>> >> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >> chamik...@google.com> wrote: >> >>>>>> >> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at >> least here: >> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >> >>>>>> >> >>>>>> I'm in favour of separating construction time transforms from >> execution time data objects that we store in PCollections as Luke >> mentioned. Also, we don't guarantee that PTransform is serializable so >> users have the additional complexity of providing a corder whenever a >> PTransform is used as a data object. >> >>>>>> Also, agree with Boyuan that using simple Java objects that are >> convertible to Beam Rows allow us to make these transforms available to >> other SDKs through the cross-language transforms. Using transforms or >> complex sources as data objects will probably make this difficult. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Cham >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com> >> wrote: >> >>>>>>> >> >>>>>>> Hi Ismael, >> >>>>>>> >> >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF >> implementation despite the type of input, where Read refers to >> UnboundedSource. One major pushback of using KafkaIO.Read as source >> description is that not all configurations of KafkaIO.Read are meaningful >> to populate during execution time. Also when thinking about x-lang useage, >> making source description across language boundaries is also necessary. As >> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: >> KafkaSourceDescription.java. Then the coder of this schema-aware object >> will be a SchemaCoder. When crossing language boundaries, it's also easy to >> convert a Row into the source description: Convert.fromRows. >> >>>>>>> >> >>>>>>> >> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com> >> wrote: >> >>>>>>>> >> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform >> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a >> POJO that contains the configurable parameters for reading from Kafka. This >> is different from the pattern that Ismael listed because they take >> PCollection<Read> as input and the Read is the same as the Read PTransform >> class used for the non read all case. >> >>>>>>>> >> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >> parameters used to configure the transform have to be copied over to the >> source descriptor but decouples how a transform is specified from the >> object that describes what needs to be done. I believe Ismael's point is >> that we wouldn't need such a decoupling. >> >>>>>>>> >> >>>>>>>> Another area that hasn't been discussed and I believe is a >> non-issue is that the Beam Java SDK has the most IO connectors and we would >> want to use the IO implementations within Beam Go and Beam Python. This >> brings in its own set of issues related to versioning and compatibility for >> the wire format and how one parameterizes such transforms. The wire format >> issue can be solved with either approach by making sure that the cross >> language expansion always takes the well known format (whatever it may be) >> and converts it into Read/KafkaSourceDescriptor/... object that is then >> passed to the ReadAll transform. Boyuan has been looking to make the >> KafkaSourceDescriptor have a schema so it can be represented as a row and >> this can be done easily using the AutoValue integration (I don't believe >> there is anything preventing someone from writing a schema row -> Read -> >> row adapter or also using the AutoValue configuration if the transform is >> also an AutoValue). >> >>>>>>>> >> >>>>>>>> I would be more for the code duplication and separation of >> concerns provided by using a different object to represent the contents of >> the PCollection from the pipeline construction time PTransform. >> >>>>>>>> >> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >> kirpic...@google.com> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi Ismael, >> >>>>>>>>> >> >>>>>>>>> Thanks for taking this on. Have you considered an approach >> similar (or dual) to FileIO.write(), where we in a sense also have to >> configure a dynamic number different IO transforms of the same type (file >> writes)? >> >>>>>>>>> >> >>>>>>>>> E.g. how in this example we configure many aspects of many file >> writes: >> >>>>>>>>> >> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >> Transaction>writeDynamic() >> >>>>>>>>> .by(Transaction::getType) >> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the data >> to be written to CSVSink >> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >> >>>>>>>>> .to(".../path/to/") >> >>>>>>>>> .withNaming(type -> defaultNaming(type + "-transactions", >> ".csv")); >> >>>>>>>>> >> >>>>>>>>> we could do something similar for many JdbcIO reads: >> >>>>>>>>> >> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all >> the read parameters can be inferred >> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) >> >>>>>>>>> ...etc); >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com> >> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hello, >> >>>>>>>>>> >> >>>>>>>>>> (my excuses for the long email but this requires context) >> >>>>>>>>>> >> >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones. >> One pattern >> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to >> have a different >> >>>>>>>>>> kind of composable reads where we take a PCollection of >> different sorts of >> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for >> example: >> >>>>>>>>>> >> >>>>>>>>>> JdbcIO: >> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >> >>>>>>>>>> >> >>>>>>>>>> RedisIO: >> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >> PCollection<KV<String, String>>> >> >>>>>>>>>> >> >>>>>>>>>> HBaseIO: >> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >> PCollection<Result>> >> >>>>>>>>>> >> >>>>>>>>>> These patterns enabled richer use cases like doing multiple >> queries in the same >> >>>>>>>>>> Pipeline, querying based on key patterns or querying from >> multiple tables at the >> >>>>>>>>>> same time but came with some maintenance issues: >> >>>>>>>>>> >> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the >> parameters for >> >>>>>>>>>> missing information so we ended up with lots of duplicated >> with methods and >> >>>>>>>>>> error-prone code from the Read transforms into the ReadAll >> transforms. >> >>>>>>>>>> >> >>>>>>>>>> - When you require new parameters you have to expand the input >> parameters of the >> >>>>>>>>>> intermediary specification into something that resembles the >> full `Read` >> >>>>>>>>>> definition for example imagine you want to read from >> multiple tables or >> >>>>>>>>>> servers as part of the same pipeline but this was not in the >> intermediate >> >>>>>>>>>> specification you end up adding those extra methods >> (duplicating more code) >> >>>>>>>>>> just o get close to the be like the Read full spec. >> >>>>>>>>>> >> >>>>>>>>>> - If new parameters are added to the Read method we end up >> adding them >> >>>>>>>>>> systematically to the ReadAll transform too so they are >> taken into account. >> >>>>>>>>>> >> >>>>>>>>>> Due to these issues I recently did a change to test a new >> approach that is >> >>>>>>>>>> simpler, more complete and maintainable. The code became: >> >>>>>>>>>> >> >>>>>>>>>> HBaseIO: >> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >> PCollection<Result>> >> >>>>>>>>>> >> >>>>>>>>>> With this approach users gain benefits of improvements on >> parameters of normal >> >>>>>>>>>> Read because they count with the full Read parameters. But of >> course there are >> >>>>>>>>>> some minor caveats: >> >>>>>>>>>> >> >>>>>>>>>> 1. You need to push some information into normal Reads for >> example >> >>>>>>>>>> partition boundaries information or Restriction information >> (in the SDF >> >>>>>>>>>> case). Notice that this consistent approach of ReadAll >> produces a simple >> >>>>>>>>>> pattern that ends up being almost reusable between IOs >> (e.g. the non-SDF >> >>>>>>>>>> case): >> >>>>>>>>>> >> >>>>>>>>>> public static class ReadAll extends >> PTransform<PCollection<Read>, >> >>>>>>>>>> PCollection<SolrDocument>> { >> >>>>>>>>>> @Override >> >>>>>>>>>> public PCollection<SolrDocument> expand(PCollection<Read> >> input) { >> >>>>>>>>>> return input >> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >> >>>>>>>>>> } >> >>>>>>>>>> } >> >>>>>>>>>> >> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you >> must have the >> >>>>>>>>>> Coders used in its definition and require consistent types >> from the data >> >>>>>>>>>> sources, in practice this means we need to add extra >> withCoder method(s) on >> >>>>>>>>>> ReadAll but not the full specs. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll >> pattern. RedisIO >> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to >> bring this subject >> >>>>>>>>>> to the mailing list to see your opinions, and if you see any >> sort of issues that >> >>>>>>>>>> we might be missing with this idea. >> >>>>>>>>>> >> >>>>>>>>>> Also I would like to see if we have consensus to start using >> consistently the >> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >> readAll() method for new >> >>>>>>>>>> IOs (at this point probably outdoing this in the only >> remaining inconsistent >> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we >> should be ok). >> >>>>>>>>>> >> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF >> is doing something >> >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe >> it is worth to be >> >>>>>>>>>> consistent for the benefit of users. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Ismaël >> >> >> >> >> >> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote: >> > >> > I had mentioned that approach 1 and approach 2 work for cross language. >> The difference being that the cross language transform would take a well >> known definition and convert it to the Read transform. A normal user would >> have a pipeline that would look like: >> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >> PCollection<Output> >> > >> > And in the cross language case this would look like: >> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) >> -> PCollection<Output>* >> > * note that PTransform(Convert Row to SourceDescriptor) only exists >> since we haven't solved how to use schemas with language bound types in a >> cross language way. SchemaCoder isn't portable but RowCoder is which is why >> the conversion step exists. We could have a solution for this at some point >> in time. >> > >> > My concern with using Read was around: >> > a) Do all properties set on a Read apply to the ReadAll? For example, >> the Kafka Read implementation allows you to set the key and value >> deserializers which are also used to dictate the output PCollection type. >> It also allows you to set how the watermark should be computed. Technically >> a user may want the watermark computation to be configurable per Read and >> they may also want an output type which is polymorphic (e.g. >> PCollection<Serializable>). >> > b) Read extends PTransform which brings its own object modelling >> concerns. >> > >> > During the implementations of ReadAll(PCollection<Read>), was it >> discovered that some properties became runtime errors or were ignored if >> they were set? If no, then the code deduplication is likely worth it >> because we also get a lot of javadoc deduplication, but if yes is this an >> acceptable user experience? >> > >> > >> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >> aromanenko....@gmail.com> wrote: >> >> >> >> I believe that the initial goal of unifying ReadAll as a general >> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the >> amount of code duplication and error-prone approach related to this. It >> makes much sense since usually we have all needed configuration set in Read >> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >> Split-Shuffle-Read stages. So this case usually can be unified by using >> PCollection<Read> as input. >> >> >> >> On the other hand, we have another need to use Java IOs as >> cross-language transforms (as Luke described) which seems only partly in >> common with previous pattern of ReadAll using. >> >> >> >> I’d be more in favour to have only one concept of read configuration >> for all needs but seems it’s not easy and I’d be more in favour with Luke >> and Boyuan approach with schema. Though, maybe ReadAll is not a very >> suitable name in this case because it will can bring some confusions >> related to previous pattern of ReadAll uses. >> >> >> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> wrote: >> >> >> >> Sorry for the typo. I mean I think we can go with (3) and (4): use the >> data type that is schema-aware as the input of ReadAll. >> >> >> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com> >> wrote: >> >>> >> >>> Thanks for the summary, Cham! >> >>> >> >>> I think we can go with (2) and (4): use the data type that is >> schema-aware as the input of ReadAll. >> >>> >> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But >> only having (3) is not enough to solve the problem of using ReadAll in >> x-lang case. >> >>> >> >>> The key point of ReadAll is that the input type of ReadAll should be >> able to cross language boundaries and have compatibilities of >> updating/downgrading. After investigating some possibilities(pure java pojo >> with custom coder, protobuf, row/schema) in Kafka usage, we find that >> row/schema fits our needs most. Here comes (4). I believe that using Read >> as input of ReadAll makes sense in some cases, but I also think not all IOs >> have the same need. I would treat Read as a special type as long as the >> Read is schema-aware. >> >>> >> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >> chamik...@google.com> wrote: >> >>>> >> >>>> I see. So it seems like there are three options discussed so far >> when it comes to defining source descriptors for ReadAll type transforms >> >>>> >> >>>> (1) Use Read PTransform as the element type of the input PCollection >> >>>> (2) Use a POJO that describes the source as the data element of the >> input PCollection >> >>>> (3) Provide a converter as a function to the Read transform which >> essentially will convert it to a ReadAll (what Eugene mentioned) >> >>>> >> >>>> I feel like (3) is more suitable for a related set of source >> descriptions such as files. >> >>>> (1) will allow most code-reuse but seems like will make it hard to >> use the ReadAll transform as a cross-language transform and will break the >> separation of construction time and runtime constructs >> >>>> (2) could result to less code reuse if not careful but will make the >> transform easier to be used as a cross-language transform without >> additional modifications >> >>>> >> >>>> Also, with SDF, we can create ReadAll-like transforms that are more >> efficient. So we might be able to just define all sources in that format >> and make Read transforms just an easy to use composite built on top of that >> (by adding a preceding Create transform). >> >>>> >> >>>> Thanks, >> >>>> Cham >> >>>> >> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> wrote: >> >>>>> >> >>>>> I believe we do require PTransforms to be serializable since >> anonymous DoFns typically capture the enclosing PTransform. >> >>>>> >> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >> chamik...@google.com> wrote: >> >>>>>> >> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at >> least here: >> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >> >>>>>> >> >>>>>> I'm in favour of separating construction time transforms from >> execution time data objects that we store in PCollections as Luke >> mentioned. Also, we don't guarantee that PTransform is serializable so >> users have the additional complexity of providing a corder whenever a >> PTransform is used as a data object. >> >>>>>> Also, agree with Boyuan that using simple Java objects that are >> convertible to Beam Rows allow us to make these transforms available to >> other SDKs through the cross-language transforms. Using transforms or >> complex sources as data objects will probably make this difficult. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Cham >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <boyu...@google.com> >> wrote: >> >>>>>>> >> >>>>>>> Hi Ismael, >> >>>>>>> >> >>>>>>> I think the ReadAll in the IO connector refers to the IO with SDF >> implementation despite the type of input, where Read refers to >> UnboundedSource. One major pushback of using KafkaIO.Read as source >> description is that not all configurations of KafkaIO.Read are meaningful >> to populate during execution time. Also when thinking about x-lang useage, >> making source description across language boundaries is also necessary. As >> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: >> KafkaSourceDescription.java. Then the coder of this schema-aware object >> will be a SchemaCoder. When crossing language boundaries, it's also easy to >> convert a Row into the source description: Convert.fromRows. >> >>>>>>> >> >>>>>>> >> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com> >> wrote: >> >>>>>>>> >> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform >> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a >> POJO that contains the configurable parameters for reading from Kafka. This >> is different from the pattern that Ismael listed because they take >> PCollection<Read> as input and the Read is the same as the Read PTransform >> class used for the non read all case. >> >>>>>>>> >> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >> parameters used to configure the transform have to be copied over to the >> source descriptor but decouples how a transform is specified from the >> object that describes what needs to be done. I believe Ismael's point is >> that we wouldn't need such a decoupling. >> >>>>>>>> >> >>>>>>>> Another area that hasn't been discussed and I believe is a >> non-issue is that the Beam Java SDK has the most IO connectors and we would >> want to use the IO implementations within Beam Go and Beam Python. This >> brings in its own set of issues related to versioning and compatibility for >> the wire format and how one parameterizes such transforms. The wire format >> issue can be solved with either approach by making sure that the cross >> language expansion always takes the well known format (whatever it may be) >> and converts it into Read/KafkaSourceDescriptor/... object that is then >> passed to the ReadAll transform. Boyuan has been looking to make the >> KafkaSourceDescriptor have a schema so it can be represented as a row and >> this can be done easily using the AutoValue integration (I don't believe >> there is anything preventing someone from writing a schema row -> Read -> >> row adapter or also using the AutoValue configuration if the transform is >> also an AutoValue). >> >>>>>>>> >> >>>>>>>> I would be more for the code duplication and separation of >> concerns provided by using a different object to represent the contents of >> the PCollection from the pipeline construction time PTransform. >> >>>>>>>> >> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >> kirpic...@google.com> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi Ismael, >> >>>>>>>>> >> >>>>>>>>> Thanks for taking this on. Have you considered an approach >> similar (or dual) to FileIO.write(), where we in a sense also have to >> configure a dynamic number different IO transforms of the same type (file >> writes)? >> >>>>>>>>> >> >>>>>>>>> E.g. how in this example we configure many aspects of many file >> writes: >> >>>>>>>>> >> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >> Transaction>writeDynamic() >> >>>>>>>>> .by(Transaction::getType) >> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the data >> to be written to CSVSink >> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >> >>>>>>>>> .to(".../path/to/") >> >>>>>>>>> .withNaming(type -> defaultNaming(type + "-transactions", >> ".csv")); >> >>>>>>>>> >> >>>>>>>>> we could do something similar for many JdbcIO reads: >> >>>>>>>>> >> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all >> the read parameters can be inferred >> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) >> >>>>>>>>> ...etc); >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <ieme...@gmail.com> >> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hello, >> >>>>>>>>>> >> >>>>>>>>>> (my excuses for the long email but this requires context) >> >>>>>>>>>> >> >>>>>>>>>> As part of the move from Source based IOs to DoFn based ones. >> One pattern >> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to >> have a different >> >>>>>>>>>> kind of composable reads where we take a PCollection of >> different sorts of >> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for >> example: >> >>>>>>>>>> >> >>>>>>>>>> JdbcIO: >> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >> >>>>>>>>>> >> >>>>>>>>>> RedisIO: >> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >> PCollection<KV<String, String>>> >> >>>>>>>>>> >> >>>>>>>>>> HBaseIO: >> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >> PCollection<Result>> >> >>>>>>>>>> >> >>>>>>>>>> These patterns enabled richer use cases like doing multiple >> queries in the same >> >>>>>>>>>> Pipeline, querying based on key patterns or querying from >> multiple tables at the >> >>>>>>>>>> same time but came with some maintenance issues: >> >>>>>>>>>> >> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the >> parameters for >> >>>>>>>>>> missing information so we ended up with lots of duplicated >> with methods and >> >>>>>>>>>> error-prone code from the Read transforms into the ReadAll >> transforms. >> >>>>>>>>>> >> >>>>>>>>>> - When you require new parameters you have to expand the input >> parameters of the >> >>>>>>>>>> intermediary specification into something that resembles the >> full `Read` >> >>>>>>>>>> definition for example imagine you want to read from >> multiple tables or >> >>>>>>>>>> servers as part of the same pipeline but this was not in the >> intermediate >> >>>>>>>>>> specification you end up adding those extra methods >> (duplicating more code) >> >>>>>>>>>> just o get close to the be like the Read full spec. >> >>>>>>>>>> >> >>>>>>>>>> - If new parameters are added to the Read method we end up >> adding them >> >>>>>>>>>> systematically to the ReadAll transform too so they are >> taken into account. >> >>>>>>>>>> >> >>>>>>>>>> Due to these issues I recently did a change to test a new >> approach that is >> >>>>>>>>>> simpler, more complete and maintainable. The code became: >> >>>>>>>>>> >> >>>>>>>>>> HBaseIO: >> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >> PCollection<Result>> >> >>>>>>>>>> >> >>>>>>>>>> With this approach users gain benefits of improvements on >> parameters of normal >> >>>>>>>>>> Read because they count with the full Read parameters. But of >> course there are >> >>>>>>>>>> some minor caveats: >> >>>>>>>>>> >> >>>>>>>>>> 1. You need to push some information into normal Reads for >> example >> >>>>>>>>>> partition boundaries information or Restriction information >> (in the SDF >> >>>>>>>>>> case). Notice that this consistent approach of ReadAll >> produces a simple >> >>>>>>>>>> pattern that ends up being almost reusable between IOs >> (e.g. the non-SDF >> >>>>>>>>>> case): >> >>>>>>>>>> >> >>>>>>>>>> public static class ReadAll extends >> PTransform<PCollection<Read>, >> >>>>>>>>>> PCollection<SolrDocument>> { >> >>>>>>>>>> @Override >> >>>>>>>>>> public PCollection<SolrDocument> expand(PCollection<Read> >> input) { >> >>>>>>>>>> return input >> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >> >>>>>>>>>> } >> >>>>>>>>>> } >> >>>>>>>>>> >> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll you >> must have the >> >>>>>>>>>> Coders used in its definition and require consistent types >> from the data >> >>>>>>>>>> sources, in practice this means we need to add extra >> withCoder method(s) on >> >>>>>>>>>> ReadAll but not the full specs. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll >> pattern. RedisIO >> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to >> bring this subject >> >>>>>>>>>> to the mailing list to see your opinions, and if you see any >> sort of issues that >> >>>>>>>>>> we might be missing with this idea. >> >>>>>>>>>> >> >>>>>>>>>> Also I would like to see if we have consensus to start using >> consistently the >> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >> readAll() method for new >> >>>>>>>>>> IOs (at this point probably outdoing this in the only >> remaining inconsistent >> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this we >> should be ok). >> >>>>>>>>>> >> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF >> is doing something >> >>>>>>>>>> similar to the old pattern but being called ReadAll and maybe >> it is worth to be >> >>>>>>>>>> consistent for the benefit of users. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Ismaël >> >> >> >> >> >>