On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <[email protected]> wrote: > I would also like to suggest that transforms that implement ReadAll via > Read should also provide methods like: > > // Uses the specified values if unspecified in the input element from the > PCollection<Read>. > withDefaults(Read read); > // Uses the specified values regardless of what the input element from the > PCollection<Read> specifies. > withOverrides(Read read); > > and only adds methods that are required at construction time (e.g. > coders). This way the majority of documentation sits on the Read transform. >
+0 from me. Sounds like benefits outweigh the drawbacks here and some of the drawbacks related to cross-language can be overcome through future advancements. Thanks for bringing this up Ismaël. - Cham > > On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <[email protected]> wrote: > >> Ismael, it is good to hear that using Read as the input didn't have a >> bunch of parameters that were being skipped/ignored. Also, for the >> polymorphism issue you have to rely on the user correctly telling you the >> type in such a way where it is a common ancestor of all the runtime types >> that will ever be used. This usually boils down to something like >> Serializable or DynamicMessage such that the coder that is chosen works for >> all the runtime types. Using multiple types is a valid use case and would >> allow for a simpler graph with less flattens merging the output from >> multiple sources. >> >> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which >> uses schemas even if some of the parameters can't be represented in a >> meaningful way beyond "bytes". This would be helpful for cross language as >> well since every parameter would become available if a language could >> support it (e.g. it could serialize a java function up front and keep it >> saved as raw bytes within said language). Even if we figure out a better >> way to do this in the future, we'll have to change the schema for the new >> way anyway. This would mean that the external version of the transform >> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from >> Row to Read could validate that the parameters make sense (e.g. the bytes >> are valid serialized functions). The addition of an >> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and >> this would enable having a bounded version that could be used for backfills >> (this doesn't have to be done as part of any current ongoing PR). >> Essentially any parameter that could be added for a single instance of a >> Kafka element+restriction would also make sense to the KafkaIO.Read >> transform since it too is a single instance. There are parameters that >> would apply to the ReadAll that wouldn't apply to a read and these would be >> global parameters across all element+restriction pairs such as config >> overrides or default values. >> >> I am convinced that we should do as Ismael is suggesting and use >> KafkaIO.Read as the type. >> >> >> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> Discussion regarding cross-language transforms is a slight tangent here. >>> But I think, in general, it's great if we can use existing transforms (for >>> example, IO connectors) as cross-language transforms without having to >>> build more composites (irrespective of whether in ExternalTransformBuilders >>> or a user pipelines) just to make them cross-language compatible. A future >>> cross-language compatible SchemaCoder might help (assuming that works for >>> Read transform) but I'm not sure we have a good idea when we'll get to that >>> state. >>> >>> Thanks, >>> Cham >>> >>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <[email protected]> wrote: >>> >>>> For unbounded SDF in Kafka, we also consider the upgrading/downgrading >>>> compatibility in the pipeline update scenario(For detailed discussion, >>>> please refer to >>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). >>>> In order to obtain the compatibility, it requires the input of the read SDF >>>> is schema-aware. >>>> >>>> Thus the major constraint of mapping KafkaSourceDescriptor to >>>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware, >>>> otherwise pipeline updates might fail unnecessarily. If looking into >>>> KafkaIO.Read, not all necessary fields are compatible with schema, for >>>> example, SerializedFunction. >>>> >>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common pattern >>>> for SDF based IO. The Read can be a common pattern because the input is >>>> always a PBegin. But for an SDF based IO, the input can be anything. By >>>> using Read as input, we will still have the maintenance cost when SDF IO >>>> supports a new field but Read doesn't consume it. For example, we are >>>> discussing adding endOffset and endReadTime to KafkaSourceDescriptior, >>>> which is not used in KafkaIO.Read. >>>> >>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <[email protected]> wrote: >>>> >>>>> We forgot to mention (5) External.Config used in cross-lang, see >>>>> KafkaIO >>>>> ExternalTransformBuilder. This approach is the predecessor of (4) and >>>>> probably a >>>>> really good candidate to be replaced by the Row based Configuration >>>>> Boyuan is >>>>> envisioning (so good to be aware of this). >>>>> >>>>> Thanks for the clear explanation Luke you mention the real issue(s). >>>>> All the >>>>> approaches discussed so far in the end could be easily transformed to >>>>> produce a >>>>> PCollection<Read> and those Read Elements could be read by the generic >>>>> ReadAll >>>>> transform. Notice that this can be internal in some IOs e.g. KafkaIO >>>>> if they >>>>> decide not to expose it. I am not saying that we should force every IO >>>>> to >>>>> support ReadAll in its public API but if we do it is probably a good >>>>> idea to be >>>>> consistent with naming the transform that expects an input >>>>> PCollection<Read> in >>>>> the same way. Also notice that using it will save us of the >>>>> maintenance issues >>>>> discussed in my previous email. >>>>> >>>>> Back to the main concern: the consequences of expansion based on Read: >>>>> So far I >>>>> have not seen consequences for the Splitting part which maps really >>>>> nice >>>>> assuming the Partition info / Restriction is available as part of >>>>> Read. So far >>>>> there are not Serialization because Beam is already enforcing this. >>>>> Notice that >>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least >>>>> for the >>>>> Bounded case (see the code in my previous email). For the other points: >>>>> >>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>> example, the >>>>> > Kafka Read implementation allows you to set the key and value >>>>> deserializers >>>>> > which are also used to dictate the output PCollection type. It also >>>>> allows you >>>>> > to set how the watermark should be computed. Technically a user may >>>>> want the >>>>> > watermark computation to be configurable per Read and they may also >>>>> want an >>>>> > output type which is polymorphic (e.g. Pcollection<Serializable>). >>>>> >>>>> Most of the times they do but for parametric types we cannot support >>>>> different >>>>> types in the outputs of the Read or at least I did not find how to do >>>>> so (is >>>>> there a way to use multiple output Coders on Beam?), we saw this in >>>>> CassandraIO >>>>> and we were discussing adding explicitly these Coders or Serializer >>>>> specific methods to the ReadAll transform. This is less nice because >>>>> it will >>>>> imply some repeated methods, but it is still a compromise to gain the >>>>> other >>>>> advantages. I suppose the watermark case you mention is similar >>>>> because you may >>>>> want the watermark to behave differently in each Read and we probably >>>>> don’t >>>>> support this, so it corresponds to the polymorphic category. >>>>> >>>>> > b) Read extends PTransform which brings its own object modelling >>>>> concerns. >>>>> >>>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>>> discovered >>>>> > that some properties became runtime errors or were ignored if they >>>>> were set? >>>>> > If no, then the code deduplication is likely worth it because we >>>>> also get a >>>>> > lot of javadoc deduplication, but if yes is this an acceptable user >>>>> > experience? >>>>> >>>>> No, not so far. This is an interesting part, notice that the Read >>>>> translation >>>>> ends up delegating the read bits to the ReadFn part of ReadAll so the >>>>> ReadFn is >>>>> the real read and must be aware and use all the parameters. >>>>> >>>>> @Override >>>>> public PCollection<SolrDocument> expand(PBegin input) { >>>>> return input.apply("Create", Create.of(this)).apply("ReadAll", >>>>> readAll()); >>>>> } >>>>> >>>>> I might be missing something for the Unbounded SDF case which is the >>>>> only case >>>>> we have not explored so far. I think one easy way to see the >>>>> limitations would >>>>> be in the ongoing KafkaIO SDF based implementation to try to map >>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the Read >>>>> logic on >>>>> the ReadAll with the SDF to see which constraints we hit, the >>>>> polymorphic ones >>>>> will be there for sure, maybe others will appear (not sure). However >>>>> it would be >>>>> interesting to see if we have a real gain in the maintenance points, >>>>> but well >>>>> let’s not forget also that KafkaIO has a LOT of knobs so probably the >>>>> generic >>>>> implementation could be relatively complex. >>>>> >>>>> >>>>> >>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote: >>>>> > >>>>> > I had mentioned that approach 1 and approach 2 work for cross >>>>> language. The difference being that the cross language transform would >>>>> take >>>>> a well known definition and convert it to the Read transform. A normal >>>>> user >>>>> would have a pipeline that would look like: >>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>> PCollection<Output> >>>>> > >>>>> > And in the cross language case this would look like: >>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) >>>>> -> PCollection<Output>* >>>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists >>>>> since we haven't solved how to use schemas with language bound types in a >>>>> cross language way. SchemaCoder isn't portable but RowCoder is which is >>>>> why >>>>> the conversion step exists. We could have a solution for this at some >>>>> point >>>>> in time. >>>>> > >>>>> > My concern with using Read was around: >>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>> example, the Kafka Read implementation allows you to set the key and value >>>>> deserializers which are also used to dictate the output PCollection type. >>>>> It also allows you to set how the watermark should be computed. >>>>> Technically >>>>> a user may want the watermark computation to be configurable per Read and >>>>> they may also want an output type which is polymorphic (e.g. >>>>> PCollection<Serializable>). >>>>> > b) Read extends PTransform which brings its own object modelling >>>>> concerns. >>>>> > >>>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>>> discovered that some properties became runtime errors or were ignored if >>>>> they were set? If no, then the code deduplication is likely worth it >>>>> because we also get a lot of javadoc deduplication, but if yes is this an >>>>> acceptable user experience? >>>>> > >>>>> > >>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>> [email protected]> wrote: >>>>> >> >>>>> >> I believe that the initial goal of unifying ReadAll as a general >>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the >>>>> amount of code duplication and error-prone approach related to this. It >>>>> makes much sense since usually we have all needed configuration set in >>>>> Read >>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >>>>> Split-Shuffle-Read stages. So this case usually can be unified by using >>>>> PCollection<Read> as input. >>>>> >> >>>>> >> On the other hand, we have another need to use Java IOs as >>>>> cross-language transforms (as Luke described) which seems only partly in >>>>> common with previous pattern of ReadAll using. >>>>> >> >>>>> >> I’d be more in favour to have only one concept of read >>>>> configuration for all needs but seems it’s not easy and I’d be more in >>>>> favour with Luke and Boyuan approach with schema. Though, maybe ReadAll is >>>>> not a very suitable name in this case because it will can bring some >>>>> confusions related to previous pattern of ReadAll uses. >>>>> >> >>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote: >>>>> >> >>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use >>>>> the data type that is schema-aware as the input of ReadAll. >>>>> >> >>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> >>>>> wrote: >>>>> >>> >>>>> >>> Thanks for the summary, Cham! >>>>> >>> >>>>> >>> I think we can go with (2) and (4): use the data type that is >>>>> schema-aware as the input of ReadAll. >>>>> >>> >>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. >>>>> But only having (3) is not enough to solve the problem of using ReadAll >>>>> in >>>>> x-lang case. >>>>> >>> >>>>> >>> The key point of ReadAll is that the input type of ReadAll should >>>>> be able to cross language boundaries and have compatibilities of >>>>> updating/downgrading. After investigating some possibilities(pure java >>>>> pojo >>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that >>>>> row/schema fits our needs most. Here comes (4). I believe that using Read >>>>> as input of ReadAll makes sense in some cases, but I also think not all >>>>> IOs >>>>> have the same need. I would treat Read as a special type as long as the >>>>> Read is schema-aware. >>>>> >>> >>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>> >>>>> >>>> I see. So it seems like there are three options discussed so far >>>>> when it comes to defining source descriptors for ReadAll type transforms >>>>> >>>> >>>>> >>>> (1) Use Read PTransform as the element type of the input >>>>> PCollection >>>>> >>>> (2) Use a POJO that describes the source as the data element of >>>>> the input PCollection >>>>> >>>> (3) Provide a converter as a function to the Read transform which >>>>> essentially will convert it to a ReadAll (what Eugene mentioned) >>>>> >>>> >>>>> >>>> I feel like (3) is more suitable for a related set of source >>>>> descriptions such as files. >>>>> >>>> (1) will allow most code-reuse but seems like will make it hard >>>>> to use the ReadAll transform as a cross-language transform and will break >>>>> the separation of construction time and runtime constructs >>>>> >>>> (2) could result to less code reuse if not careful but will make >>>>> the transform easier to be used as a cross-language transform without >>>>> additional modifications >>>>> >>>> >>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are >>>>> more efficient. So we might be able to just define all sources in that >>>>> format and make Read transforms just an easy to use composite built on top >>>>> of that (by adding a preceding Create transform). >>>>> >>>> >>>>> >>>> Thanks, >>>>> >>>> Cham >>>>> >>>> >>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> >>>>> wrote: >>>>> >>>>> >>>>> >>>>> I believe we do require PTransforms to be serializable since >>>>> anonymous DoFns typically capture the enclosing PTransform. >>>>> >>>>> >>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at >>>>> least here: >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>> >>>>>> >>>>> >>>>>> I'm in favour of separating construction time transforms from >>>>> execution time data objects that we store in PCollections as Luke >>>>> mentioned. Also, we don't guarantee that PTransform is serializable so >>>>> users have the additional complexity of providing a corder whenever a >>>>> PTransform is used as a data object. >>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are >>>>> convertible to Beam Rows allow us to make these transforms available to >>>>> other SDKs through the cross-language transforms. Using transforms or >>>>> complex sources as data objects will probably make this difficult. >>>>> >>>>>> >>>>> >>>>>> Thanks, >>>>> >>>>>> Cham >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>> [email protected]> wrote: >>>>> >>>>>>> >>>>> >>>>>>> Hi Ismael, >>>>> >>>>>>> >>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with >>>>> SDF implementation despite the type of input, where Read refers to >>>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>>> description is that not all configurations of KafkaIO.Read are meaningful >>>>> to populate during execution time. Also when thinking about x-lang useage, >>>>> making source description across language boundaries is also necessary. >>>>> As >>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue >>>>> object: >>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object >>>>> will be a SchemaCoder. When crossing language boundaries, it's also easy >>>>> to >>>>> convert a Row into the source description: Convert.fromRows. >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> >>>>> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform >>>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is >>>>> a >>>>> POJO that contains the configurable parameters for reading from Kafka. >>>>> This >>>>> is different from the pattern that Ismael listed because they take >>>>> PCollection<Read> as input and the Read is the same as the Read PTransform >>>>> class used for the non read all case. >>>>> >>>>>>>> >>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >>>>> parameters used to configure the transform have to be copied over to the >>>>> source descriptor but decouples how a transform is specified from the >>>>> object that describes what needs to be done. I believe Ismael's point is >>>>> that we wouldn't need such a decoupling. >>>>> >>>>>>>> >>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a >>>>> non-issue is that the Beam Java SDK has the most IO connectors and we >>>>> would >>>>> want to use the IO implementations within Beam Go and Beam Python. This >>>>> brings in its own set of issues related to versioning and compatibility >>>>> for >>>>> the wire format and how one parameterizes such transforms. The wire format >>>>> issue can be solved with either approach by making sure that the cross >>>>> language expansion always takes the well known format (whatever it may be) >>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then >>>>> passed to the ReadAll transform. Boyuan has been looking to make the >>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and >>>>> this can be done easily using the AutoValue integration (I don't believe >>>>> there is anything preventing someone from writing a schema row -> Read -> >>>>> row adapter or also using the AutoValue configuration if the transform is >>>>> also an AutoValue). >>>>> >>>>>>>> >>>>> >>>>>>>> I would be more for the code duplication and separation of >>>>> concerns provided by using a different object to represent the contents of >>>>> the PCollection from the pipeline construction time PTransform. >>>>> >>>>>>>> >>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>> [email protected]> wrote: >>>>> >>>>>>>>> >>>>> >>>>>>>>> Hi Ismael, >>>>> >>>>>>>>> >>>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach >>>>> similar (or dual) to FileIO.write(), where we in a sense also have to >>>>> configure a dynamic number different IO transforms of the same type (file >>>>> writes)? >>>>> >>>>>>>>> >>>>> >>>>>>>>> E.g. how in this example we configure many aspects of many >>>>> file writes: >>>>> >>>>>>>>> >>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>> Transaction>writeDynamic() >>>>> >>>>>>>>> .by(Transaction::getType) >>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the >>>>> data to be written to CSVSink >>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>>> >>>>>>>>> .to(".../path/to/") >>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>> "-transactions", ".csv")); >>>>> >>>>>>>>> >>>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>>> >>>>>>>>> >>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all >>>>> the read parameters can be inferred >>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this >>>>> bar...) >>>>> >>>>>>>>> ...etc); >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>> [email protected]> wrote: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Hello, >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> (my excuses for the long email but this requires context) >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based >>>>> ones. One pattern >>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is >>>>> to have a different >>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of >>>>> different sorts of >>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for >>>>> example: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> JdbcIO: >>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> RedisIO: >>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>> PCollection<KV<String, String>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> HBaseIO: >>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>> PCollection<Result>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple >>>>> queries in the same >>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from >>>>> multiple tables at the >>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the >>>>> parameters for >>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>> duplicated with methods and >>>>> >>>>>>>>>> error-prone code from the Read transforms into the >>>>> ReadAll transforms. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> - When you require new parameters you have to expand the >>>>> input parameters of the >>>>> >>>>>>>>>> intermediary specification into something that resembles >>>>> the full `Read` >>>>> >>>>>>>>>> definition for example imagine you want to read from >>>>> multiple tables or >>>>> >>>>>>>>>> servers as part of the same pipeline but this was not in >>>>> the intermediate >>>>> >>>>>>>>>> specification you end up adding those extra methods >>>>> (duplicating more code) >>>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> - If new parameters are added to the Read method we end up >>>>> adding them >>>>> >>>>>>>>>> systematically to the ReadAll transform too so they are >>>>> taken into account. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Due to these issues I recently did a change to test a new >>>>> approach that is >>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> HBaseIO: >>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>> PCollection<Result>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> With this approach users gain benefits of improvements on >>>>> parameters of normal >>>>> >>>>>>>>>> Read because they count with the full Read parameters. But >>>>> of course there are >>>>> >>>>>>>>>> some minor caveats: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 1. You need to push some information into normal Reads for >>>>> example >>>>> >>>>>>>>>> partition boundaries information or Restriction >>>>> information (in the SDF >>>>> >>>>>>>>>> case). Notice that this consistent approach of ReadAll >>>>> produces a simple >>>>> >>>>>>>>>> pattern that ends up being almost reusable between IOs >>>>> (e.g. the non-SDF >>>>> >>>>>>>>>> case): >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> public static class ReadAll extends >>>>> PTransform<PCollection<Read>, >>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>> >>>>>>>>>> @Override >>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>> expand(PCollection<Read> input) { >>>>> >>>>>>>>>> return input >>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>> >>>>>>>>>> } >>>>> >>>>>>>>>> } >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll >>>>> you must have the >>>>> >>>>>>>>>> Coders used in its definition and require consistent >>>>> types from the data >>>>> >>>>>>>>>> sources, in practice this means we need to add extra >>>>> withCoder method(s) on >>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this >>>>> ReadAll pattern. RedisIO >>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted >>>>> to bring this subject >>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see >>>>> any sort of issues that >>>>> >>>>>>>>>> we might be missing with this idea. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Also I would like to see if we have consensus to start >>>>> using consistently the >>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >>>>> readAll() method for new >>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>>> remaining inconsistent >>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this >>>>> we should be ok). >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on >>>>> SDF is doing something >>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and >>>>> maybe it is worth to be >>>>> >>>>>>>>>> consistent for the benefit of users. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Regards, >>>>> >>>>>>>>>> Ismaël >>>>> >> >>>>> >> >>>>> >>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <[email protected]> wrote: >>>>> > >>>>> > I had mentioned that approach 1 and approach 2 work for cross >>>>> language. The difference being that the cross language transform would >>>>> take >>>>> a well known definition and convert it to the Read transform. A normal >>>>> user >>>>> would have a pipeline that would look like: >>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>> PCollection<Output> >>>>> > >>>>> > And in the cross language case this would look like: >>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>>> Read) -> PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row to >>>>> SourceDescriptor) -> PCollection<SourceDescriptor> -> PTransform(ReadAll) >>>>> -> PCollection<Output>* >>>>> > * note that PTransform(Convert Row to SourceDescriptor) only exists >>>>> since we haven't solved how to use schemas with language bound types in a >>>>> cross language way. SchemaCoder isn't portable but RowCoder is which is >>>>> why >>>>> the conversion step exists. We could have a solution for this at some >>>>> point >>>>> in time. >>>>> > >>>>> > My concern with using Read was around: >>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>> example, the Kafka Read implementation allows you to set the key and value >>>>> deserializers which are also used to dictate the output PCollection type. >>>>> It also allows you to set how the watermark should be computed. >>>>> Technically >>>>> a user may want the watermark computation to be configurable per Read and >>>>> they may also want an output type which is polymorphic (e.g. >>>>> PCollection<Serializable>). >>>>> > b) Read extends PTransform which brings its own object modelling >>>>> concerns. >>>>> > >>>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>>> discovered that some properties became runtime errors or were ignored if >>>>> they were set? If no, then the code deduplication is likely worth it >>>>> because we also get a lot of javadoc deduplication, but if yes is this an >>>>> acceptable user experience? >>>>> > >>>>> > >>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>> [email protected]> wrote: >>>>> >> >>>>> >> I believe that the initial goal of unifying ReadAll as a general >>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce the >>>>> amount of code duplication and error-prone approach related to this. It >>>>> makes much sense since usually we have all needed configuration set in >>>>> Read >>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >>>>> Split-Shuffle-Read stages. So this case usually can be unified by using >>>>> PCollection<Read> as input. >>>>> >> >>>>> >> On the other hand, we have another need to use Java IOs as >>>>> cross-language transforms (as Luke described) which seems only partly in >>>>> common with previous pattern of ReadAll using. >>>>> >> >>>>> >> I’d be more in favour to have only one concept of read >>>>> configuration for all needs but seems it’s not easy and I’d be more in >>>>> favour with Luke and Boyuan approach with schema. Though, maybe ReadAll is >>>>> not a very suitable name in this case because it will can bring some >>>>> confusions related to previous pattern of ReadAll uses. >>>>> >> >>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <[email protected]> wrote: >>>>> >> >>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): use >>>>> the data type that is schema-aware as the input of ReadAll. >>>>> >> >>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> >>>>> wrote: >>>>> >>> >>>>> >>> Thanks for the summary, Cham! >>>>> >>> >>>>> >>> I think we can go with (2) and (4): use the data type that is >>>>> schema-aware as the input of ReadAll. >>>>> >>> >>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. >>>>> But only having (3) is not enough to solve the problem of using ReadAll >>>>> in >>>>> x-lang case. >>>>> >>> >>>>> >>> The key point of ReadAll is that the input type of ReadAll should >>>>> be able to cross language boundaries and have compatibilities of >>>>> updating/downgrading. After investigating some possibilities(pure java >>>>> pojo >>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that >>>>> row/schema fits our needs most. Here comes (4). I believe that using Read >>>>> as input of ReadAll makes sense in some cases, but I also think not all >>>>> IOs >>>>> have the same need. I would treat Read as a special type as long as the >>>>> Read is schema-aware. >>>>> >>> >>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>> >>>>> >>>> I see. So it seems like there are three options discussed so far >>>>> when it comes to defining source descriptors for ReadAll type transforms >>>>> >>>> >>>>> >>>> (1) Use Read PTransform as the element type of the input >>>>> PCollection >>>>> >>>> (2) Use a POJO that describes the source as the data element of >>>>> the input PCollection >>>>> >>>> (3) Provide a converter as a function to the Read transform which >>>>> essentially will convert it to a ReadAll (what Eugene mentioned) >>>>> >>>> >>>>> >>>> I feel like (3) is more suitable for a related set of source >>>>> descriptions such as files. >>>>> >>>> (1) will allow most code-reuse but seems like will make it hard >>>>> to use the ReadAll transform as a cross-language transform and will break >>>>> the separation of construction time and runtime constructs >>>>> >>>> (2) could result to less code reuse if not careful but will make >>>>> the transform easier to be used as a cross-language transform without >>>>> additional modifications >>>>> >>>> >>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are >>>>> more efficient. So we might be able to just define all sources in that >>>>> format and make Read transforms just an easy to use composite built on top >>>>> of that (by adding a preceding Create transform). >>>>> >>>> >>>>> >>>> Thanks, >>>>> >>>> Cham >>>>> >>>> >>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> >>>>> wrote: >>>>> >>>>> >>>>> >>>>> I believe we do require PTransforms to be serializable since >>>>> anonymous DoFns typically capture the enclosing PTransform. >>>>> >>>>> >>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, at >>>>> least here: >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>> >>>>>> >>>>> >>>>>> I'm in favour of separating construction time transforms from >>>>> execution time data objects that we store in PCollections as Luke >>>>> mentioned. Also, we don't guarantee that PTransform is serializable so >>>>> users have the additional complexity of providing a corder whenever a >>>>> PTransform is used as a data object. >>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that are >>>>> convertible to Beam Rows allow us to make these transforms available to >>>>> other SDKs through the cross-language transforms. Using transforms or >>>>> complex sources as data objects will probably make this difficult. >>>>> >>>>>> >>>>> >>>>>> Thanks, >>>>> >>>>>> Cham >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>> [email protected]> wrote: >>>>> >>>>>>> >>>>> >>>>>>> Hi Ismael, >>>>> >>>>>>> >>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO with >>>>> SDF implementation despite the type of input, where Read refers to >>>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>>> description is that not all configurations of KafkaIO.Read are meaningful >>>>> to populate during execution time. Also when thinking about x-lang useage, >>>>> making source description across language boundaries is also necessary. >>>>> As >>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue >>>>> object: >>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object >>>>> will be a SchemaCoder. When crossing language boundaries, it's also easy >>>>> to >>>>> convert a Row into the source description: Convert.fromRows. >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> >>>>> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll transform >>>>> takes a PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is >>>>> a >>>>> POJO that contains the configurable parameters for reading from Kafka. >>>>> This >>>>> is different from the pattern that Ismael listed because they take >>>>> PCollection<Read> as input and the Read is the same as the Read PTransform >>>>> class used for the non read all case. >>>>> >>>>>>>> >>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >>>>> parameters used to configure the transform have to be copied over to the >>>>> source descriptor but decouples how a transform is specified from the >>>>> object that describes what needs to be done. I believe Ismael's point is >>>>> that we wouldn't need such a decoupling. >>>>> >>>>>>>> >>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a >>>>> non-issue is that the Beam Java SDK has the most IO connectors and we >>>>> would >>>>> want to use the IO implementations within Beam Go and Beam Python. This >>>>> brings in its own set of issues related to versioning and compatibility >>>>> for >>>>> the wire format and how one parameterizes such transforms. The wire format >>>>> issue can be solved with either approach by making sure that the cross >>>>> language expansion always takes the well known format (whatever it may be) >>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then >>>>> passed to the ReadAll transform. Boyuan has been looking to make the >>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and >>>>> this can be done easily using the AutoValue integration (I don't believe >>>>> there is anything preventing someone from writing a schema row -> Read -> >>>>> row adapter or also using the AutoValue configuration if the transform is >>>>> also an AutoValue). >>>>> >>>>>>>> >>>>> >>>>>>>> I would be more for the code duplication and separation of >>>>> concerns provided by using a different object to represent the contents of >>>>> the PCollection from the pipeline construction time PTransform. >>>>> >>>>>>>> >>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>> [email protected]> wrote: >>>>> >>>>>>>>> >>>>> >>>>>>>>> Hi Ismael, >>>>> >>>>>>>>> >>>>> >>>>>>>>> Thanks for taking this on. Have you considered an approach >>>>> similar (or dual) to FileIO.write(), where we in a sense also have to >>>>> configure a dynamic number different IO transforms of the same type (file >>>>> writes)? >>>>> >>>>>>>>> >>>>> >>>>>>>>> E.g. how in this example we configure many aspects of many >>>>> file writes: >>>>> >>>>>>>>> >>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>> Transaction>writeDynamic() >>>>> >>>>>>>>> .by(Transaction::getType) >>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the >>>>> data to be written to CSVSink >>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>>> >>>>>>>>> .to(".../path/to/") >>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>> "-transactions", ".csv")); >>>>> >>>>>>>>> >>>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>>> >>>>>>>>> >>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which all >>>>> the read parameters can be inferred >>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this >>>>> bar...) >>>>> >>>>>>>>> ...etc); >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>> [email protected]> wrote: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Hello, >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> (my excuses for the long email but this requires context) >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based >>>>> ones. One pattern >>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea is >>>>> to have a different >>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of >>>>> different sorts of >>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, for >>>>> example: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> JdbcIO: >>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> RedisIO: >>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>> PCollection<KV<String, String>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> HBaseIO: >>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>> PCollection<Result>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> These patterns enabled richer use cases like doing multiple >>>>> queries in the same >>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying from >>>>> multiple tables at the >>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms the >>>>> parameters for >>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>> duplicated with methods and >>>>> >>>>>>>>>> error-prone code from the Read transforms into the >>>>> ReadAll transforms. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> - When you require new parameters you have to expand the >>>>> input parameters of the >>>>> >>>>>>>>>> intermediary specification into something that resembles >>>>> the full `Read` >>>>> >>>>>>>>>> definition for example imagine you want to read from >>>>> multiple tables or >>>>> >>>>>>>>>> servers as part of the same pipeline but this was not in >>>>> the intermediate >>>>> >>>>>>>>>> specification you end up adding those extra methods >>>>> (duplicating more code) >>>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> - If new parameters are added to the Read method we end up >>>>> adding them >>>>> >>>>>>>>>> systematically to the ReadAll transform too so they are >>>>> taken into account. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Due to these issues I recently did a change to test a new >>>>> approach that is >>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> HBaseIO: >>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>> PCollection<Result>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> With this approach users gain benefits of improvements on >>>>> parameters of normal >>>>> >>>>>>>>>> Read because they count with the full Read parameters. But >>>>> of course there are >>>>> >>>>>>>>>> some minor caveats: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 1. You need to push some information into normal Reads for >>>>> example >>>>> >>>>>>>>>> partition boundaries information or Restriction >>>>> information (in the SDF >>>>> >>>>>>>>>> case). Notice that this consistent approach of ReadAll >>>>> produces a simple >>>>> >>>>>>>>>> pattern that ends up being almost reusable between IOs >>>>> (e.g. the non-SDF >>>>> >>>>>>>>>> case): >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> public static class ReadAll extends >>>>> PTransform<PCollection<Read>, >>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>> >>>>>>>>>> @Override >>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>> expand(PCollection<Read> input) { >>>>> >>>>>>>>>> return input >>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>> >>>>>>>>>> } >>>>> >>>>>>>>>> } >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 2. If you are using Generic types for the results ReadAll >>>>> you must have the >>>>> >>>>>>>>>> Coders used in its definition and require consistent >>>>> types from the data >>>>> >>>>>>>>>> sources, in practice this means we need to add extra >>>>> withCoder method(s) on >>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this >>>>> ReadAll pattern. RedisIO >>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted >>>>> to bring this subject >>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see >>>>> any sort of issues that >>>>> >>>>>>>>>> we might be missing with this idea. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Also I would like to see if we have consensus to start >>>>> using consistently the >>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >>>>> readAll() method for new >>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>>> remaining inconsistent >>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of this >>>>> we should be ok). >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on >>>>> SDF is doing something >>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and >>>>> maybe it is worth to be >>>>> >>>>>>>>>> consistent for the benefit of users. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Regards, >>>>> >>>>>>>>>> Ismaël >>>>> >> >>>>> >> >>>>> >>>>>
