I would also like to suggest that transforms that implement ReadAll via Read should also provide methods like:
// Uses the specified values if unspecified in the input element from the PCollection<Read>. withDefaults(Read read); // Uses the specified values regardless of what the input element from the PCollection<Read> specifies. withOverrides(Read read); and only adds methods that are required at construction time (e.g. coders). This way the majority of documentation sits on the Read transform. On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> wrote: > Ismael, it is good to hear that using Read as the input didn't have a > bunch of parameters that were being skipped/ignored. Also, for the > polymorphism issue you have to rely on the user correctly telling you the > type in such a way where it is a common ancestor of all the runtime types > that will ever be used. This usually boils down to something like > Serializable or DynamicMessage such that the coder that is chosen works for > all the runtime types. Using multiple types is a valid use case and would > allow for a simpler graph with less flattens merging the output from > multiple sources. > > Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which > uses schemas even if some of the parameters can't be represented in a > meaningful way beyond "bytes". This would be helpful for cross language as > well since every parameter would become available if a language could > support it (e.g. it could serialize a java function up front and keep it > saved as raw bytes within said language). Even if we figure out a better > way to do this in the future, we'll have to change the schema for the new > way anyway. This would mean that the external version of the transform > adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from > Row to Read could validate that the parameters make sense (e.g. the bytes > are valid serialized functions). The addition of an > endReadTime/endReadOffset would make sense for KafkaIO.Read as well and > this would enable having a bounded version that could be used for backfills > (this doesn't have to be done as part of any current ongoing PR). > Essentially any parameter that could be added for a single instance of a > Kafka element+restriction would also make sense to the KafkaIO.Read > transform since it too is a single instance. There are parameters that > would apply to the ReadAll that wouldn't apply to a read and these would be > global parameters across all element+restriction pairs such as config > overrides or default values. > > I am convinced that we should do as Ismael is suggesting and use > KafkaIO.Read as the type. > > > On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <[email protected]> > wrote: > >> Discussion regarding cross-language transforms is a slight tangent here. >> But I think, in general, it's great if we can use existing transforms (for >> example, IO connectors) as cross-language transforms without having to >> build more composites (irrespective of whether in ExternalTransformBuilders >> or a user pipelines) just to make them cross-language compatible. A future >> cross-language compatible SchemaCoder might help (assuming that works for >> Read transform) but I'm not sure we have a good idea when we'll get to that >> state. >> >> Thanks, >> Cham >> >> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <[email protected]> wrote: >> >>> For unbounded SDF in Kafka, we also consider the upgrading/downgrading >>> compatibility in the pipeline update scenario(For detailed discussion, >>> please refer to >>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). >>> In order to obtain the compatibility, it requires the input of the read SDF >>> is schema-aware. >>> >>> Thus the major constraint of mapping KafkaSourceDescriptor to >>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware, >>> otherwise pipeline updates might fail unnecessarily. If looking into >>> KafkaIO.Read, not all necessary fields are compatible with schema, for >>> example, SerializedFunction. >>> >>> I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern >>> for SDF based IO. The Read can be a common pattern because the input is >>> always a PBegin. But for an SDF based IO, the input can be anything. By >>> using Read as input, we will still have the maintenance cost when SDF IO >>> supports a new field but Read doesn't consume it. For example, we are >>> discussing adding endOffset and endReadTime to KafkaSourceDescriptior, >>> which is not used in KafkaIO.Read. >>> >>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <[email protected]> wrote: >>> >>>> We forgot to mention (5) External.Config used in cross-lang, see KafkaIO >>>> ExternalTransformBuilder. This approach is the predecessor of (4) and >>>> probably a >>>> really good candidate to be replaced by the Row based Configuration >>>> Boyuan is >>>> envisioning (so good to be aware of this). >>>> >>>> Thanks for the clear explanation Luke you mention the real issue(s). >>>> All the >>>> approaches discussed so far in the end could be easily transformed to >>>> produce a >>>> PCollection<Read> and those Read Elements could be read by the generic >>>> ReadAll >>>> transform. Notice that this can be internal in some IOs e.g. KafkaIO if >>>> they >>>> decide not to expose it. I am not saying that we should force every IO >>>> to >>>> support ReadAll in its public API but if we do it is probably a good >>>> idea to be >>>> consistent with naming the transform that expects an input >>>> PCollection<Read> in >>>> the same way. Also notice that using it will save us of the maintenance >>>> issues >>>> discussed in my previous email. >>>> >>>> Back to the main concern: the consequences of expansion based on Read: >>>> So far I >>>> have not seen consequences for the Splitting part which maps really nice >>>> assuming the Partition info / Restriction is available as part of Read. >>>> So far >>>> there are not Serialization because Beam is already enforcing this. >>>> Notice that >>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least for >>>> the >>>> Bounded case (see the code in my previous email). For the other points: >>>> >>>> > a) Do all properties set on a Read apply to the ReadAll? For example, >>>> the >>>> > Kafka Read implementation allows you to set the key and value >>>> deserializers >>>> > which are also used to dictate the output PCollection type. It also >>>> allows you >>>> > to set how the watermark should be computed. Technically a user may >>>> want the >>>> > watermark computation to be configurable per Read and they may also >>>> want an >>>> > output type which is polymorphic (e.g. Pcollection<Serializable>). >>>> >>>> Most of the times they do but for parametric types we cannot support >>>> different >>>> types in the outputs of the Read or at least I did not find how to do >>>> so (is >>>> there a way to use multiple output Coders on Beam?), we saw this in >>>> CassandraIO >>>> and we were discussing adding explicitly these Coders or Serializer >>>> specific methods to the ReadAll transform. This is less nice because it >>>> will >>>> imply some repeated methods, but it is still a compromise to gain the >>>> other >>>> advantages. I suppose the watermark case you mention is similar because >>>> you may >>>> want the watermark to behave differently in each Read and we probably >>>> don’t >>>> support this, so it corresponds to the polymorphic category. >>>> >>>> > b) Read extends PTransform which brings its own object modelling >>>> concerns. >>>> >>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>> discovered >>>> > that some properties became runtime errors or were ignored if they >>>> were set? >>>> > If no, then the code deduplication is likely worth it because we also >>>> get a >>>> > lot of javadoc deduplication, but if yes is this an acceptable user >>>> > experience? >>>> >>>> No, not so far. This is an interesting part, notice that the Read >>>> translation >>>> ends up delegating the read bits to the ReadFn part of ReadAll so the >>>> ReadFn is >>>> the real read and must be aware and use all the parameters. >>>> >>>> @Override >>>> public PCollection<SolrDocument> expand(PBegin input) { >>>> return input.apply("Create", Create.of(this)).apply("ReadAll", >>>> readAll()); >>>> } >>>> >>>> I might be missing something for the Unbounded SDF case which is the >>>> only case >>>> we have not explored so far. I think one easy way to see the >>>> limitations would >>>> be in the ongoing KafkaIO SDF based implementation to try to map >>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the Read >>>> logic on >>>> the ReadAll with the SDF to see which constraints we hit, the >>>> polymorphic ones >>>> will be there for sure, maybe others will appear (not sure). However it >>>> would be >>>> interesting to see if we have a real gain in the maintenance points, >>>> but well >>>> let’s not forget also that KafkaIO has a LOT of knobs so probably the >>>> generic >>>> implementation could be relatively complex. >>>> >>>> >>>> >>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote: >>>> > >>>> > I had mentioned that approach 1 and approach 2 work for cross >>>> language. The difference being that the cross language transform would take >>>> a well known definition and convert it to the Read transform. A normal user >>>> would have a pipeline that would look like: >>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>> PCollection<Output> >>>> > >>>> > And in the cross language case this would look like: >>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) >>>> -> PCollection<Output>* >>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists >>>> since we haven't solved how to use schemas with language bound types in a >>>> cross language way. SchemaCoder isn't portable but RowCoder is which is why >>>> the conversion step exists. We could have a solution for this at some point >>>> in time. >>>> > >>>> > My concern with using Read was around: >>>> > a) Do all properties set on a Read apply to the ReadAll? For example, >>>> the Kafka Read implementation allows you to set the key and value >>>> deserializers which are also used to dictate the output PCollection type. >>>> It also allows you to set how the watermark should be computed. Technically >>>> a user may want the watermark computation to be configurable per Read and >>>> they may also want an output type which is polymorphic (e.g. >>>> PCollection<Serializable>). >>>> > b) Read extends PTransform which brings its own object modelling >>>> concerns. >>>> > >>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>> discovered that some properties became runtime errors or were ignored if >>>> they were set? If no, then the code deduplication is likely worth it >>>> because we also get a lot of javadoc deduplication, but if yes is this an >>>> acceptable user experience? >>>> > >>>> > >>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>> [email protected]> wrote: >>>> >> >>>> >> I believe that the initial goal of unifying ReadAll as a general >>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the >>>> amount of code duplication and error-prone approach related to this. It >>>> makes much sense since usually we have all needed configuration set in Read >>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >>>> Split-Shuffle-Read stages. So this case usually can be unified by using >>>> PCollection<Read> as input. >>>> >> >>>> >> On the other hand, we have another need to use Java IOs as >>>> cross-language transforms (as Luke described) which seems only partly in >>>> common with previous pattern of ReadAll using. >>>> >> >>>> >> I’d be more in favour to have only one concept of read configuration >>>> for all needs but seems it’s not easy and I’d be more in favour with Luke >>>> and Boyuan approach with schema. Though, maybe ReadAll is not a very >>>> suitable name in this case because it will can bring some confusions >>>> related to previous pattern of ReadAll uses. >>>> >> >>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote: >>>> >> >>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use >>>> the data type that is schema-aware as the input of ReadAll. >>>> >> >>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>> >>>> >>> Thanks for the summary, Cham! >>>> >>> >>>> >>> I think we can go with (2) and (4): use the data type that is >>>> schema-aware as the input of ReadAll. >>>> >>> >>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. >>>> But only having (3) is not enough to solve the problem of using ReadAll in >>>> x-lang case. >>>> >>> >>>> >>> The key point of ReadAll is that the input type of ReadAll should >>>> be able to cross language boundaries and have compatibilities of >>>> updating/downgrading. After investigating some possibilities(pure java pojo >>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that >>>> row/schema fits our needs most. Here comes (4). I believe that using Read >>>> as input of ReadAll makes sense in some cases, but I also think not all IOs >>>> have the same need. I would treat Read as a special type as long as the >>>> Read is schema-aware. >>>> >>> >>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>> >>>> >>>> I see. So it seems like there are three options discussed so far >>>> when it comes to defining source descriptors for ReadAll type transforms >>>> >>>> >>>> >>>> (1) Use Read PTransform as the element type of the input >>>> PCollection >>>> >>>> (2) Use a POJO that describes the source as the data element of >>>> the input PCollection >>>> >>>> (3) Provide a converter as a function to the Read transform which >>>> essentially will convert it to a ReadAll (what Eugene mentioned) >>>> >>>> >>>> >>>> I feel like (3) is more suitable for a related set of source >>>> descriptions such as files. >>>> >>>> (1) will allow most code-reuse but seems like will make it hard to >>>> use the ReadAll transform as a cross-language transform and will break the >>>> separation of construction time and runtime constructs >>>> >>>> (2) could result to less code reuse if not careful but will make >>>> the transform easier to be used as a cross-language transform without >>>> additional modifications >>>> >>>> >>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are >>>> more efficient. So we might be able to just define all sources in that >>>> format and make Read transforms just an easy to use composite built on top >>>> of that (by adding a preceding Create transform). >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Cham >>>> >>>> >>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> >>>> wrote: >>>> >>>>> >>>> >>>>> I believe we do require PTransforms to be serializable since >>>> anonymous DoFns typically capture the enclosing PTransform. >>>> >>>>> >>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>>> >>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at >>>> least here: >>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>> >>>>>> >>>> >>>>>> I'm in favour of separating construction time transforms from >>>> execution time data objects that we store in PCollections as Luke >>>> mentioned. Also, we don't guarantee that PTransform is serializable so >>>> users have the additional complexity of providing a corder whenever a >>>> PTransform is used as a data object. >>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are >>>> convertible to Beam Rows allow us to make these transforms available to >>>> other SDKs through the cross-language transforms. Using transforms or >>>> complex sources as data objects will probably make this difficult. >>>> >>>>>> >>>> >>>>>> Thanks, >>>> >>>>>> Cham >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>> [email protected]> wrote: >>>> >>>>>>> >>>> >>>>>>> Hi Ismael, >>>> >>>>>>> >>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with >>>> SDF implementation despite the type of input, where Read refers to >>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>> description is that not all configurations of KafkaIO.Read are meaningful >>>> to populate during execution time. Also when thinking about x-lang useage, >>>> making source description across language boundaries is also necessary. As >>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: >>>> KafkaSourceDescription.java. Then the coder of this schema-aware object >>>> will be a SchemaCoder. When crossing language boundaries, it's also easy to >>>> convert a Row into the source description: Convert.fromRows. >>>> >>>>>>> >>>> >>>>>>> >>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> >>>> wrote: >>>> >>>>>>>> >>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform >>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a >>>> POJO that contains the configurable parameters for reading from Kafka. This >>>> is different from the pattern that Ismael listed because they take >>>> PCollection<Read> as input and the Read is the same as the Read PTransform >>>> class used for the non read all case. >>>> >>>>>>>> >>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >>>> parameters used to configure the transform have to be copied over to the >>>> source descriptor but decouples how a transform is specified from the >>>> object that describes what needs to be done. I believe Ismael's point is >>>> that we wouldn't need such a decoupling. >>>> >>>>>>>> >>>> >>>>>>>> Another area that hasn't been discussed and I believe is a >>>> non-issue is that the Beam Java SDK has the most IO connectors and we would >>>> want to use the IO implementations within Beam Go and Beam Python. This >>>> brings in its own set of issues related to versioning and compatibility for >>>> the wire format and how one parameterizes such transforms. The wire format >>>> issue can be solved with either approach by making sure that the cross >>>> language expansion always takes the well known format (whatever it may be) >>>> and converts it into Read/KafkaSourceDescriptor/... object that is then >>>> passed to the ReadAll transform. Boyuan has been looking to make the >>>> KafkaSourceDescriptor have a schema so it can be represented as a row and >>>> this can be done easily using the AutoValue integration (I don't believe >>>> there is anything preventing someone from writing a schema row -> Read -> >>>> row adapter or also using the AutoValue configuration if the transform is >>>> also an AutoValue). >>>> >>>>>>>> >>>> >>>>>>>> I would be more for the code duplication and separation of >>>> concerns provided by using a different object to represent the contents of >>>> the PCollection from the pipeline construction time PTransform. >>>> >>>>>>>> >>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>> [email protected]> wrote: >>>> >>>>>>>>> >>>> >>>>>>>>> Hi Ismael, >>>> >>>>>>>>> >>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach >>>> similar (or dual) to FileIO.write(), where we in a sense also have to >>>> configure a dynamic number different IO transforms of the same type (file >>>> writes)? >>>> >>>>>>>>> >>>> >>>>>>>>> E.g. how in this example we configure many aspects of many >>>> file writes: >>>> >>>>>>>>> >>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>> Transaction>writeDynamic() >>>> >>>>>>>>> .by(Transaction::getType) >>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the >>>> data to be written to CSVSink >>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>> >>>>>>>>> .to(".../path/to/") >>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>> "-transactions", ".csv")); >>>> >>>>>>>>> >>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>> >>>>>>>>> >>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all >>>> the read parameters can be inferred >>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) >>>> >>>>>>>>> ...etc); >>>> >>>>>>>>> >>>> >>>>>>>>> >>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>> [email protected]> wrote: >>>> >>>>>>>>>> >>>> >>>>>>>>>> Hello, >>>> >>>>>>>>>> >>>> >>>>>>>>>> (my excuses for the long email but this requires context) >>>> >>>>>>>>>> >>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based >>>> ones. One pattern >>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to >>>> have a different >>>> >>>>>>>>>> kind of composable reads where we take a PCollection of >>>> different sorts of >>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for >>>> example: >>>> >>>>>>>>>> >>>> >>>>>>>>>> JdbcIO: >>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> RedisIO: >>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>> PCollection<KV<String, String>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> HBaseIO: >>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>> PCollection<Result>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple >>>> queries in the same >>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from >>>> multiple tables at the >>>> >>>>>>>>>> same time but came with some maintenance issues: >>>> >>>>>>>>>> >>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the >>>> parameters for >>>> >>>>>>>>>> missing information so we ended up with lots of duplicated >>>> with methods and >>>> >>>>>>>>>> error-prone code from the Read transforms into the ReadAll >>>> transforms. >>>> >>>>>>>>>> >>>> >>>>>>>>>> - When you require new parameters you have to expand the >>>> input parameters of the >>>> >>>>>>>>>> intermediary specification into something that resembles >>>> the full `Read` >>>> >>>>>>>>>> definition for example imagine you want to read from >>>> multiple tables or >>>> >>>>>>>>>> servers as part of the same pipeline but this was not in >>>> the intermediate >>>> >>>>>>>>>> specification you end up adding those extra methods >>>> (duplicating more code) >>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>> >>>>>>>>>> >>>> >>>>>>>>>> - If new parameters are added to the Read method we end up >>>> adding them >>>> >>>>>>>>>> systematically to the ReadAll transform too so they are >>>> taken into account. >>>> >>>>>>>>>> >>>> >>>>>>>>>> Due to these issues I recently did a change to test a new >>>> approach that is >>>> >>>>>>>>>> simpler, more complete and maintainable. The code became: >>>> >>>>>>>>>> >>>> >>>>>>>>>> HBaseIO: >>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>> PCollection<Result>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> With this approach users gain benefits of improvements on >>>> parameters of normal >>>> >>>>>>>>>> Read because they count with the full Read parameters. But >>>> of course there are >>>> >>>>>>>>>> some minor caveats: >>>> >>>>>>>>>> >>>> >>>>>>>>>> 1. You need to push some information into normal Reads for >>>> example >>>> >>>>>>>>>> partition boundaries information or Restriction >>>> information (in the SDF >>>> >>>>>>>>>> case). Notice that this consistent approach of ReadAll >>>> produces a simple >>>> >>>>>>>>>> pattern that ends up being almost reusable between IOs >>>> (e.g. the non-SDF >>>> >>>>>>>>>> case): >>>> >>>>>>>>>> >>>> >>>>>>>>>> public static class ReadAll extends >>>> PTransform<PCollection<Read>, >>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>> >>>>>>>>>> @Override >>>> >>>>>>>>>> public PCollection<SolrDocument> >>>> expand(PCollection<Read> input) { >>>> >>>>>>>>>> return input >>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>> >>>>>>>>>> } >>>> >>>>>>>>>> } >>>> >>>>>>>>>> >>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll >>>> you must have the >>>> >>>>>>>>>> Coders used in its definition and require consistent >>>> types from the data >>>> >>>>>>>>>> sources, in practice this means we need to add extra >>>> withCoder method(s) on >>>> >>>>>>>>>> ReadAll but not the full specs. >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll >>>> pattern. RedisIO >>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted >>>> to bring this subject >>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see any >>>> sort of issues that >>>> >>>>>>>>>> we might be missing with this idea. >>>> >>>>>>>>>> >>>> >>>>>>>>>> Also I would like to see if we have consensus to start using >>>> consistently the >>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >>>> readAll() method for new >>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>> remaining inconsistent >>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this >>>> we should be ok). >>>> >>>>>>>>>> >>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF >>>> is doing something >>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and >>>> maybe it is worth to be >>>> >>>>>>>>>> consistent for the benefit of users. >>>> >>>>>>>>>> >>>> >>>>>>>>>> Regards, >>>> >>>>>>>>>> Ismaël >>>> >> >>>> >> >>>> >>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote: >>>> > >>>> > I had mentioned that approach 1 and approach 2 work for cross >>>> language. The difference being that the cross language transform would take >>>> a well known definition and convert it to the Read transform. A normal user >>>> would have a pipeline that would look like: >>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>> PCollection<Output> >>>> > >>>> > And in the cross language case this would look like: >>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) >>>> -> PCollection<Output>* >>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists >>>> since we haven't solved how to use schemas with language bound types in a >>>> cross language way. SchemaCoder isn't portable but RowCoder is which is why >>>> the conversion step exists. We could have a solution for this at some point >>>> in time. >>>> > >>>> > My concern with using Read was around: >>>> > a) Do all properties set on a Read apply to the ReadAll? For example, >>>> the Kafka Read implementation allows you to set the key and value >>>> deserializers which are also used to dictate the output PCollection type. >>>> It also allows you to set how the watermark should be computed. Technically >>>> a user may want the watermark computation to be configurable per Read and >>>> they may also want an output type which is polymorphic (e.g. >>>> PCollection<Serializable>). >>>> > b) Read extends PTransform which brings its own object modelling >>>> concerns. >>>> > >>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>> discovered that some properties became runtime errors or were ignored if >>>> they were set? If no, then the code deduplication is likely worth it >>>> because we also get a lot of javadoc deduplication, but if yes is this an >>>> acceptable user experience? >>>> > >>>> > >>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>> [email protected]> wrote: >>>> >> >>>> >> I believe that the initial goal of unifying ReadAll as a general >>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the >>>> amount of code duplication and error-prone approach related to this. It >>>> makes much sense since usually we have all needed configuration set in Read >>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >>>> Split-Shuffle-Read stages. So this case usually can be unified by using >>>> PCollection<Read> as input. >>>> >> >>>> >> On the other hand, we have another need to use Java IOs as >>>> cross-language transforms (as Luke described) which seems only partly in >>>> common with previous pattern of ReadAll using. >>>> >> >>>> >> I’d be more in favour to have only one concept of read configuration >>>> for all needs but seems it’s not easy and I’d be more in favour with Luke >>>> and Boyuan approach with schema. Though, maybe ReadAll is not a very >>>> suitable name in this case because it will can bring some confusions >>>> related to previous pattern of ReadAll uses. >>>> >> >>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote: >>>> >> >>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use >>>> the data type that is schema-aware as the input of ReadAll. >>>> >> >>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>> >>>> >>> Thanks for the summary, Cham! >>>> >>> >>>> >>> I think we can go with (2) and (4): use the data type that is >>>> schema-aware as the input of ReadAll. >>>> >>> >>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. >>>> But only having (3) is not enough to solve the problem of using ReadAll in >>>> x-lang case. >>>> >>> >>>> >>> The key point of ReadAll is that the input type of ReadAll should >>>> be able to cross language boundaries and have compatibilities of >>>> updating/downgrading. After investigating some possibilities(pure java pojo >>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that >>>> row/schema fits our needs most. Here comes (4). I believe that using Read >>>> as input of ReadAll makes sense in some cases, but I also think not all IOs >>>> have the same need. I would treat Read as a special type as long as the >>>> Read is schema-aware. >>>> >>> >>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>> >>>> >>>> I see. So it seems like there are three options discussed so far >>>> when it comes to defining source descriptors for ReadAll type transforms >>>> >>>> >>>> >>>> (1) Use Read PTransform as the element type of the input >>>> PCollection >>>> >>>> (2) Use a POJO that describes the source as the data element of >>>> the input PCollection >>>> >>>> (3) Provide a converter as a function to the Read transform which >>>> essentially will convert it to a ReadAll (what Eugene mentioned) >>>> >>>> >>>> >>>> I feel like (3) is more suitable for a related set of source >>>> descriptions such as files. >>>> >>>> (1) will allow most code-reuse but seems like will make it hard to >>>> use the ReadAll transform as a cross-language transform and will break the >>>> separation of construction time and runtime constructs >>>> >>>> (2) could result to less code reuse if not careful but will make >>>> the transform easier to be used as a cross-language transform without >>>> additional modifications >>>> >>>> >>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are >>>> more efficient. So we might be able to just define all sources in that >>>> format and make Read transforms just an easy to use composite built on top >>>> of that (by adding a preceding Create transform). >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Cham >>>> >>>> >>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> >>>> wrote: >>>> >>>>> >>>> >>>>> I believe we do require PTransforms to be serializable since >>>> anonymous DoFns typically capture the enclosing PTransform. >>>> >>>>> >>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>>> >>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at >>>> least here: >>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>> >>>>>> >>>> >>>>>> I'm in favour of separating construction time transforms from >>>> execution time data objects that we store in PCollections as Luke >>>> mentioned. Also, we don't guarantee that PTransform is serializable so >>>> users have the additional complexity of providing a corder whenever a >>>> PTransform is used as a data object. >>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are >>>> convertible to Beam Rows allow us to make these transforms available to >>>> other SDKs through the cross-language transforms. Using transforms or >>>> complex sources as data objects will probably make this difficult. >>>> >>>>>> >>>> >>>>>> Thanks, >>>> >>>>>> Cham >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>> [email protected]> wrote: >>>> >>>>>>> >>>> >>>>>>> Hi Ismael, >>>> >>>>>>> >>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with >>>> SDF implementation despite the type of input, where Read refers to >>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>> description is that not all configurations of KafkaIO.Read are meaningful >>>> to populate during execution time. Also when thinking about x-lang useage, >>>> making source description across language boundaries is also necessary. As >>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: >>>> KafkaSourceDescription.java. Then the coder of this schema-aware object >>>> will be a SchemaCoder. When crossing language boundaries, it's also easy to >>>> convert a Row into the source description: Convert.fromRows. >>>> >>>>>>> >>>> >>>>>>> >>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> >>>> wrote: >>>> >>>>>>>> >>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform >>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a >>>> POJO that contains the configurable parameters for reading from Kafka. This >>>> is different from the pattern that Ismael listed because they take >>>> PCollection<Read> as input and the Read is the same as the Read PTransform >>>> class used for the non read all case. >>>> >>>>>>>> >>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >>>> parameters used to configure the transform have to be copied over to the >>>> source descriptor but decouples how a transform is specified from the >>>> object that describes what needs to be done. I believe Ismael's point is >>>> that we wouldn't need such a decoupling. >>>> >>>>>>>> >>>> >>>>>>>> Another area that hasn't been discussed and I believe is a >>>> non-issue is that the Beam Java SDK has the most IO connectors and we would >>>> want to use the IO implementations within Beam Go and Beam Python. This >>>> brings in its own set of issues related to versioning and compatibility for >>>> the wire format and how one parameterizes such transforms. The wire format >>>> issue can be solved with either approach by making sure that the cross >>>> language expansion always takes the well known format (whatever it may be) >>>> and converts it into Read/KafkaSourceDescriptor/... object that is then >>>> passed to the ReadAll transform. Boyuan has been looking to make the >>>> KafkaSourceDescriptor have a schema so it can be represented as a row and >>>> this can be done easily using the AutoValue integration (I don't believe >>>> there is anything preventing someone from writing a schema row -> Read -> >>>> row adapter or also using the AutoValue configuration if the transform is >>>> also an AutoValue). >>>> >>>>>>>> >>>> >>>>>>>> I would be more for the code duplication and separation of >>>> concerns provided by using a different object to represent the contents of >>>> the PCollection from the pipeline construction time PTransform. >>>> >>>>>>>> >>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>> [email protected]> wrote: >>>> >>>>>>>>> >>>> >>>>>>>>> Hi Ismael, >>>> >>>>>>>>> >>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach >>>> similar (or dual) to FileIO.write(), where we in a sense also have to >>>> configure a dynamic number different IO transforms of the same type (file >>>> writes)? >>>> >>>>>>>>> >>>> >>>>>>>>> E.g. how in this example we configure many aspects of many >>>> file writes: >>>> >>>>>>>>> >>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>> Transaction>writeDynamic() >>>> >>>>>>>>> .by(Transaction::getType) >>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the >>>> data to be written to CSVSink >>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>> >>>>>>>>> .to(".../path/to/") >>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>> "-transactions", ".csv")); >>>> >>>>>>>>> >>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>> >>>>>>>>> >>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all >>>> the read parameters can be inferred >>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) >>>> >>>>>>>>> ...etc); >>>> >>>>>>>>> >>>> >>>>>>>>> >>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>> [email protected]> wrote: >>>> >>>>>>>>>> >>>> >>>>>>>>>> Hello, >>>> >>>>>>>>>> >>>> >>>>>>>>>> (my excuses for the long email but this requires context) >>>> >>>>>>>>>> >>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based >>>> ones. One pattern >>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is to >>>> have a different >>>> >>>>>>>>>> kind of composable reads where we take a PCollection of >>>> different sorts of >>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for >>>> example: >>>> >>>>>>>>>> >>>> >>>>>>>>>> JdbcIO: >>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> RedisIO: >>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>> PCollection<KV<String, String>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> HBaseIO: >>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>> PCollection<Result>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple >>>> queries in the same >>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from >>>> multiple tables at the >>>> >>>>>>>>>> same time but came with some maintenance issues: >>>> >>>>>>>>>> >>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the >>>> parameters for >>>> >>>>>>>>>> missing information so we ended up with lots of duplicated >>>> with methods and >>>> >>>>>>>>>> error-prone code from the Read transforms into the ReadAll >>>> transforms. >>>> >>>>>>>>>> >>>> >>>>>>>>>> - When you require new parameters you have to expand the >>>> input parameters of the >>>> >>>>>>>>>> intermediary specification into something that resembles >>>> the full `Read` >>>> >>>>>>>>>> definition for example imagine you want to read from >>>> multiple tables or >>>> >>>>>>>>>> servers as part of the same pipeline but this was not in >>>> the intermediate >>>> >>>>>>>>>> specification you end up adding those extra methods >>>> (duplicating more code) >>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>> >>>>>>>>>> >>>> >>>>>>>>>> - If new parameters are added to the Read method we end up >>>> adding them >>>> >>>>>>>>>> systematically to the ReadAll transform too so they are >>>> taken into account. >>>> >>>>>>>>>> >>>> >>>>>>>>>> Due to these issues I recently did a change to test a new >>>> approach that is >>>> >>>>>>>>>> simpler, more complete and maintainable. The code became: >>>> >>>>>>>>>> >>>> >>>>>>>>>> HBaseIO: >>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>> PCollection<Result>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> With this approach users gain benefits of improvements on >>>> parameters of normal >>>> >>>>>>>>>> Read because they count with the full Read parameters. But >>>> of course there are >>>> >>>>>>>>>> some minor caveats: >>>> >>>>>>>>>> >>>> >>>>>>>>>> 1. You need to push some information into normal Reads for >>>> example >>>> >>>>>>>>>> partition boundaries information or Restriction >>>> information (in the SDF >>>> >>>>>>>>>> case). Notice that this consistent approach of ReadAll >>>> produces a simple >>>> >>>>>>>>>> pattern that ends up being almost reusable between IOs >>>> (e.g. the non-SDF >>>> >>>>>>>>>> case): >>>> >>>>>>>>>> >>>> >>>>>>>>>> public static class ReadAll extends >>>> PTransform<PCollection<Read>, >>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>> >>>>>>>>>> @Override >>>> >>>>>>>>>> public PCollection<SolrDocument> >>>> expand(PCollection<Read> input) { >>>> >>>>>>>>>> return input >>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>> >>>>>>>>>> } >>>> >>>>>>>>>> } >>>> >>>>>>>>>> >>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll >>>> you must have the >>>> >>>>>>>>>> Coders used in its definition and require consistent >>>> types from the data >>>> >>>>>>>>>> sources, in practice this means we need to add extra >>>> withCoder method(s) on >>>> >>>>>>>>>> ReadAll but not the full specs. >>>> >>>>>>>>>> >>>> >>>>>>>>>> >>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll >>>> pattern. RedisIO >>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted >>>> to bring this subject >>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see any >>>> sort of issues that >>>> >>>>>>>>>> we might be missing with this idea. >>>> >>>>>>>>>> >>>> >>>>>>>>>> Also I would like to see if we have consensus to start using >>>> consistently the >>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >>>> readAll() method for new >>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>> remaining inconsistent >>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this >>>> we should be ok). >>>> >>>>>>>>>> >>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF >>>> is doing something >>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and >>>> maybe it is worth to be >>>> >>>>>>>>>> consistent for the benefit of users. >>>> >>>>>>>>>> >>>> >>>>>>>>>> Regards, >>>> >>>>>>>>>> Ismaël >>>> >> >>>> >> >>>> >>>>
