Sorry for jumping into this late and casting a vote against the consensus... but I think I'd prefer standardizing on a pattern like PCollection<KafkaSourceDescriptor> rather than PCollection<Read>. That approach clearly separates the parameters that are allowed to vary across a ReadAll (the ones defined in KafkaSourceDescriptor) from the parameters that should be constant (other parameters in the Read object, like SerializedFunctions for type conversions, parameters for different operating modes, etc...). I think it's helpful to think of the parameters that are allowed to vary as some "location descriptor", but I imagine IO authors may want other parameters to vary across a ReadAll as well.
To me it seems safer to let an IO author "opt-in" to a parameter being dynamic at execution time. Brian On Mon, Jun 29, 2020 at 9:26 AM Eugene Kirpichov <kirpic...@google.com> wrote: > I'd like to raise one more time the question of consistency between > dynamic reads and dynamic writes, per my email at the beginning of the > thread. > If the community prefers ReadAll to read from Read, then should > dynamicWrite's write to Write? > > On Mon, Jun 29, 2020 at 8:57 AM Boyuan Zhang <boyu...@google.com> wrote: > >> It seems like most of us agree on the idea that ReadAll should read from >> Read. I'm going to update the Kafka ReadAll with the same pattern. >> Thanks for all your help! >> >> On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> >>> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik <lc...@google.com> wrote: >>> >>>> I would also like to suggest that transforms that implement ReadAll via >>>> Read should also provide methods like: >>>> >>>> // Uses the specified values if unspecified in the input element from >>>> the PCollection<Read>. >>>> withDefaults(Read read); >>>> // Uses the specified values regardless of what the input element from >>>> the PCollection<Read> specifies. >>>> withOverrides(Read read); >>>> >>>> and only adds methods that are required at construction time (e.g. >>>> coders). This way the majority of documentation sits on the Read transform. >>>> >>> >>> +0 from me. Sounds like benefits outweigh the drawbacks here and some of >>> the drawbacks related to cross-language can be overcome through future >>> advancements. >>> Thanks for bringing this up Ismaël. >>> >>> - Cham >>> >>> >>>> >>>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik <lc...@google.com> wrote: >>>> >>>>> Ismael, it is good to hear that using Read as the input didn't have a >>>>> bunch of parameters that were being skipped/ignored. Also, for the >>>>> polymorphism issue you have to rely on the user correctly telling you the >>>>> type in such a way where it is a common ancestor of all the runtime types >>>>> that will ever be used. This usually boils down to something like >>>>> Serializable or DynamicMessage such that the coder that is chosen works >>>>> for >>>>> all the runtime types. Using multiple types is a valid use case and would >>>>> allow for a simpler graph with less flattens merging the output from >>>>> multiple sources. >>>>> >>>>> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read >>>>> which uses schemas even if some of the parameters can't be represented in >>>>> a >>>>> meaningful way beyond "bytes". This would be helpful for cross language as >>>>> well since every parameter would become available if a language could >>>>> support it (e.g. it could serialize a java function up front and keep it >>>>> saved as raw bytes within said language). Even if we figure out a better >>>>> way to do this in the future, we'll have to change the schema for the new >>>>> way anyway. This would mean that the external version of the transform >>>>> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from >>>>> Row to Read could validate that the parameters make sense (e.g. the bytes >>>>> are valid serialized functions). The addition of an >>>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and >>>>> this would enable having a bounded version that could be used for >>>>> backfills >>>>> (this doesn't have to be done as part of any current ongoing PR). >>>>> Essentially any parameter that could be added for a single instance of a >>>>> Kafka element+restriction would also make sense to the KafkaIO.Read >>>>> transform since it too is a single instance. There are parameters that >>>>> would apply to the ReadAll that wouldn't apply to a read and these would >>>>> be >>>>> global parameters across all element+restriction pairs such as config >>>>> overrides or default values. >>>>> >>>>> I am convinced that we should do as Ismael is suggesting and use >>>>> KafkaIO.Read as the type. >>>>> >>>>> >>>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> Discussion regarding cross-language transforms is a slight tangent >>>>>> here. But I think, in general, it's great if we can use existing >>>>>> transforms >>>>>> (for example, IO connectors) as cross-language transforms without having >>>>>> to >>>>>> build more composites (irrespective of whether in >>>>>> ExternalTransformBuilders >>>>>> or a user pipelines) just to make them cross-language compatible. A >>>>>> future >>>>>> cross-language compatible SchemaCoder might help (assuming that works for >>>>>> Read transform) but I'm not sure we have a good idea when we'll get to >>>>>> that >>>>>> state. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang <boyu...@google.com> >>>>>> wrote: >>>>>> >>>>>>> For unbounded SDF in Kafka, we also consider the >>>>>>> upgrading/downgrading compatibility in the pipeline update scenario(For >>>>>>> detailed discussion, please refer to >>>>>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E). >>>>>>> In order to obtain the compatibility, it requires the input of the read >>>>>>> SDF >>>>>>> is schema-aware. >>>>>>> >>>>>>> Thus the major constraint of mapping KafkaSourceDescriptor to >>>>>>> PCollection<Read> is, the KafkaIO.Read also needs to be schema-aware, >>>>>>> otherwise pipeline updates might fail unnecessarily. If looking into >>>>>>> KafkaIO.Read, not all necessary fields are compatible with schema, for >>>>>>> example, SerializedFunction. >>>>>>> >>>>>>> I'm kind of confused by why ReadAll<Read, OutputT> is a common >>>>>>> pattern for SDF based IO. The Read can be a common pattern because the >>>>>>> input is always a PBegin. But for an SDF based IO, the input can be >>>>>>> anything. By using Read as input, we will still have the maintenance >>>>>>> cost >>>>>>> when SDF IO supports a new field but Read doesn't consume it. For >>>>>>> example, >>>>>>> we are discussing adding endOffset and endReadTime to >>>>>>> KafkaSourceDescriptior, which is not used in KafkaIO.Read. >>>>>>> >>>>>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía <ieme...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> We forgot to mention (5) External.Config used in cross-lang, see >>>>>>>> KafkaIO >>>>>>>> ExternalTransformBuilder. This approach is the predecessor of (4) >>>>>>>> and probably a >>>>>>>> really good candidate to be replaced by the Row based Configuration >>>>>>>> Boyuan is >>>>>>>> envisioning (so good to be aware of this). >>>>>>>> >>>>>>>> Thanks for the clear explanation Luke you mention the real >>>>>>>> issue(s). All the >>>>>>>> approaches discussed so far in the end could be easily transformed >>>>>>>> to produce a >>>>>>>> PCollection<Read> and those Read Elements could be read by the >>>>>>>> generic ReadAll >>>>>>>> transform. Notice that this can be internal in some IOs e.g. >>>>>>>> KafkaIO if they >>>>>>>> decide not to expose it. I am not saying that we should force every >>>>>>>> IO to >>>>>>>> support ReadAll in its public API but if we do it is probably a >>>>>>>> good idea to be >>>>>>>> consistent with naming the transform that expects an input >>>>>>>> PCollection<Read> in >>>>>>>> the same way. Also notice that using it will save us of the >>>>>>>> maintenance issues >>>>>>>> discussed in my previous email. >>>>>>>> >>>>>>>> Back to the main concern: the consequences of expansion based on >>>>>>>> Read: So far I >>>>>>>> have not seen consequences for the Splitting part which maps really >>>>>>>> nice >>>>>>>> assuming the Partition info / Restriction is available as part of >>>>>>>> Read. So far >>>>>>>> there are not Serialization because Beam is already enforcing this. >>>>>>>> Notice that >>>>>>>> ReadAll expansion is almost ‘equivalent’ to a poor man SDF at least >>>>>>>> for the >>>>>>>> Bounded case (see the code in my previous email). For the other >>>>>>>> points: >>>>>>>> >>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>>>>> example, the >>>>>>>> > Kafka Read implementation allows you to set the key and value >>>>>>>> deserializers >>>>>>>> > which are also used to dictate the output PCollection type. It >>>>>>>> also allows you >>>>>>>> > to set how the watermark should be computed. Technically a user >>>>>>>> may want the >>>>>>>> > watermark computation to be configurable per Read and they may >>>>>>>> also want an >>>>>>>> > output type which is polymorphic (e.g. Pcollection<Serializable>). >>>>>>>> >>>>>>>> Most of the times they do but for parametric types we cannot >>>>>>>> support different >>>>>>>> types in the outputs of the Read or at least I did not find how to >>>>>>>> do so (is >>>>>>>> there a way to use multiple output Coders on Beam?), we saw this in >>>>>>>> CassandraIO >>>>>>>> and we were discussing adding explicitly these Coders or Serializer >>>>>>>> specific methods to the ReadAll transform. This is less nice >>>>>>>> because it will >>>>>>>> imply some repeated methods, but it is still a compromise to gain >>>>>>>> the other >>>>>>>> advantages. I suppose the watermark case you mention is similar >>>>>>>> because you may >>>>>>>> want the watermark to behave differently in each Read and we >>>>>>>> probably don’t >>>>>>>> support this, so it corresponds to the polymorphic category. >>>>>>>> >>>>>>>> > b) Read extends PTransform which brings its own object modelling >>>>>>>> concerns. >>>>>>>> >>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>>>>>> discovered >>>>>>>> > that some properties became runtime errors or were ignored if >>>>>>>> they were set? >>>>>>>> > If no, then the code deduplication is likely worth it because we >>>>>>>> also get a >>>>>>>> > lot of javadoc deduplication, but if yes is this an acceptable >>>>>>>> user >>>>>>>> > experience? >>>>>>>> >>>>>>>> No, not so far. This is an interesting part, notice that the Read >>>>>>>> translation >>>>>>>> ends up delegating the read bits to the ReadFn part of ReadAll so >>>>>>>> the ReadFn is >>>>>>>> the real read and must be aware and use all the parameters. >>>>>>>> >>>>>>>> @Override >>>>>>>> public PCollection<SolrDocument> expand(PBegin input) { >>>>>>>> return input.apply("Create", >>>>>>>> Create.of(this)).apply("ReadAll", readAll()); >>>>>>>> } >>>>>>>> >>>>>>>> I might be missing something for the Unbounded SDF case which is >>>>>>>> the only case >>>>>>>> we have not explored so far. I think one easy way to see the >>>>>>>> limitations would >>>>>>>> be in the ongoing KafkaIO SDF based implementation to try to map >>>>>>>> KafkaSourceDescriptor to do the extra PCollection<Read> and the >>>>>>>> Read logic on >>>>>>>> the ReadAll with the SDF to see which constraints we hit, the >>>>>>>> polymorphic ones >>>>>>>> will be there for sure, maybe others will appear (not sure). >>>>>>>> However it would be >>>>>>>> interesting to see if we have a real gain in the maintenance >>>>>>>> points, but well >>>>>>>> let’s not forget also that KafkaIO has a LOT of knobs so probably >>>>>>>> the generic >>>>>>>> implementation could be relatively complex. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote: >>>>>>>> > >>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross >>>>>>>> language. The difference being that the cross language transform would >>>>>>>> take >>>>>>>> a well known definition and convert it to the Read transform. A normal >>>>>>>> user >>>>>>>> would have a pipeline that would look like: >>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>> PCollection<Output> >>>>>>>> > >>>>>>>> > And in the cross language case this would look like: >>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row >>>>>>>> to Read) -> PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>> PCollection<Output> >>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row >>>>>>>> to SourceDescriptor) -> PCollection<SourceDescriptor> -> >>>>>>>> PTransform(ReadAll) -> PCollection<Output>* >>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only >>>>>>>> exists since we haven't solved how to use schemas with language bound >>>>>>>> types >>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is >>>>>>>> which >>>>>>>> is why the conversion step exists. We could have a solution for this at >>>>>>>> some point in time. >>>>>>>> > >>>>>>>> > My concern with using Read was around: >>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>>>>> example, the Kafka Read implementation allows you to set the key and >>>>>>>> value >>>>>>>> deserializers which are also used to dictate the output PCollection >>>>>>>> type. >>>>>>>> It also allows you to set how the watermark should be computed. >>>>>>>> Technically >>>>>>>> a user may want the watermark computation to be configurable per Read >>>>>>>> and >>>>>>>> they may also want an output type which is polymorphic (e.g. >>>>>>>> PCollection<Serializable>). >>>>>>>> > b) Read extends PTransform which brings its own object modelling >>>>>>>> concerns. >>>>>>>> > >>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>>>>>> discovered that some properties became runtime errors or were ignored >>>>>>>> if >>>>>>>> they were set? If no, then the code deduplication is likely worth it >>>>>>>> because we also get a lot of javadoc deduplication, but if yes is this >>>>>>>> an >>>>>>>> acceptable user experience? >>>>>>>> > >>>>>>>> > >>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>>>>> aromanenko....@gmail.com> wrote: >>>>>>>> >> >>>>>>>> >> I believe that the initial goal of unifying ReadAll as a general >>>>>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce >>>>>>>> the >>>>>>>> amount of code duplication and error-prone approach related to this. It >>>>>>>> makes much sense since usually we have all needed configuration set in >>>>>>>> Read >>>>>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >>>>>>>> Split-Shuffle-Read stages. So this case usually can be unified by >>>>>>>> using >>>>>>>> PCollection<Read> as input. >>>>>>>> >> >>>>>>>> >> On the other hand, we have another need to use Java IOs as >>>>>>>> cross-language transforms (as Luke described) which seems only partly >>>>>>>> in >>>>>>>> common with previous pattern of ReadAll using. >>>>>>>> >> >>>>>>>> >> I’d be more in favour to have only one concept of read >>>>>>>> configuration for all needs but seems it’s not easy and I’d be more in >>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe >>>>>>>> ReadAll is >>>>>>>> not a very suitable name in this case because it will can bring some >>>>>>>> confusions related to previous pattern of ReadAll uses. >>>>>>>> >> >>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> >>>>>>>> wrote: >>>>>>>> >> >>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): >>>>>>>> use the data type that is schema-aware as the input of ReadAll. >>>>>>>> >> >>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com> >>>>>>>> wrote: >>>>>>>> >>> >>>>>>>> >>> Thanks for the summary, Cham! >>>>>>>> >>> >>>>>>>> >>> I think we can go with (2) and (4): use the data type that is >>>>>>>> schema-aware as the input of ReadAll. >>>>>>>> >>> >>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like >>>>>>>> IO. But only having (3) is not enough to solve the problem of using >>>>>>>> ReadAll in x-lang case. >>>>>>>> >>> >>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll >>>>>>>> should be able to cross language boundaries and have compatibilities of >>>>>>>> updating/downgrading. After investigating some possibilities(pure java >>>>>>>> pojo >>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that >>>>>>>> row/schema fits our needs most. Here comes (4). I believe that using >>>>>>>> Read >>>>>>>> as input of ReadAll makes sense in some cases, but I also think not >>>>>>>> all IOs >>>>>>>> have the same need. I would treat Read as a special type as long as the >>>>>>>> Read is schema-aware. >>>>>>>> >>> >>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>> >>>>>>>> >>>> I see. So it seems like there are three options discussed so >>>>>>>> far when it comes to defining source descriptors for ReadAll type >>>>>>>> transforms >>>>>>>> >>>> >>>>>>>> >>>> (1) Use Read PTransform as the element type of the input >>>>>>>> PCollection >>>>>>>> >>>> (2) Use a POJO that describes the source as the data element >>>>>>>> of the input PCollection >>>>>>>> >>>> (3) Provide a converter as a function to the Read transform >>>>>>>> which essentially will convert it to a ReadAll (what Eugene mentioned) >>>>>>>> >>>> >>>>>>>> >>>> I feel like (3) is more suitable for a related set of source >>>>>>>> descriptions such as files. >>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it >>>>>>>> hard to use the ReadAll transform as a cross-language transform and >>>>>>>> will >>>>>>>> break the separation of construction time and runtime constructs >>>>>>>> >>>> (2) could result to less code reuse if not careful but will >>>>>>>> make the transform easier to be used as a cross-language transform >>>>>>>> without >>>>>>>> additional modifications >>>>>>>> >>>> >>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are >>>>>>>> more efficient. So we might be able to just define all sources in that >>>>>>>> format and make Read transforms just an easy to use composite built on >>>>>>>> top >>>>>>>> of that (by adding a preceding Create transform). >>>>>>>> >>>> >>>>>>>> >>>> Thanks, >>>>>>>> >>>> Cham >>>>>>>> >>>> >>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> I believe we do require PTransforms to be serializable since >>>>>>>> anonymous DoFns typically capture the enclosing PTransform. >>>>>>>> >>>>> >>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>>>> >>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, >>>>>>>> at least here: >>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>>>>> >>>>>> >>>>>>>> >>>>>> I'm in favour of separating construction time transforms >>>>>>>> from execution time data objects that we store in PCollections as Luke >>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable so >>>>>>>> users have the additional complexity of providing a corder whenever a >>>>>>>> PTransform is used as a data object. >>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that >>>>>>>> are convertible to Beam Rows allow us to make these transforms >>>>>>>> available to >>>>>>>> other SDKs through the cross-language transforms. Using transforms or >>>>>>>> complex sources as data objects will probably make this difficult. >>>>>>>> >>>>>> >>>>>>>> >>>>>> Thanks, >>>>>>>> >>>>>> Cham >>>>>>>> >>>>>> >>>>>>>> >>>>>> >>>>>>>> >>>>>> >>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>>>>> boyu...@google.com> wrote: >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Hi Ismael, >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO >>>>>>>> with SDF implementation despite the type of input, where Read refers to >>>>>>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>>>>>> description is that not all configurations of KafkaIO.Read are >>>>>>>> meaningful >>>>>>>> to populate during execution time. Also when thinking about x-lang >>>>>>>> useage, >>>>>>>> making source description across language boundaries is also >>>>>>>> necessary. As >>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue >>>>>>>> object: >>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object >>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also >>>>>>>> easy to >>>>>>>> convert a Row into the source description: Convert.fromRows. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll >>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This >>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable >>>>>>>> parameters >>>>>>>> for reading from Kafka. This is different from the pattern that Ismael >>>>>>>> listed because they take PCollection<Read> as input and the Read is the >>>>>>>> same as the Read PTransform class used for the non read all case. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >>>>>>>> parameters used to configure the transform have to be copied over to >>>>>>>> the >>>>>>>> source descriptor but decouples how a transform is specified from the >>>>>>>> object that describes what needs to be done. I believe Ismael's point >>>>>>>> is >>>>>>>> that we wouldn't need such a decoupling. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a >>>>>>>> non-issue is that the Beam Java SDK has the most IO connectors and we >>>>>>>> would >>>>>>>> want to use the IO implementations within Beam Go and Beam Python. This >>>>>>>> brings in its own set of issues related to versioning and >>>>>>>> compatibility for >>>>>>>> the wire format and how one parameterizes such transforms. The wire >>>>>>>> format >>>>>>>> issue can be solved with either approach by making sure that the cross >>>>>>>> language expansion always takes the well known format (whatever it may >>>>>>>> be) >>>>>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then >>>>>>>> passed to the ReadAll transform. Boyuan has been looking to make the >>>>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row >>>>>>>> and >>>>>>>> this can be done easily using the AutoValue integration (I don't >>>>>>>> believe >>>>>>>> there is anything preventing someone from writing a schema row -> Read >>>>>>>> -> >>>>>>>> row adapter or also using the AutoValue configuration if the transform >>>>>>>> is >>>>>>>> also an AutoValue). >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I would be more for the code duplication and separation of >>>>>>>> concerns provided by using a different object to represent the >>>>>>>> contents of >>>>>>>> the PCollection from the pipeline construction time PTransform. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Hi Ismael, >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an >>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense also >>>>>>>> have >>>>>>>> to configure a dynamic number different IO transforms of the same type >>>>>>>> (file writes)? >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of >>>>>>>> many file writes: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>>>>> Transaction>writeDynamic() >>>>>>>> >>>>>>>>> .by(Transaction::getType) >>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert >>>>>>>> the data to be written to CSVSink >>>>>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>>>>>> >>>>>>>>> .to(".../path/to/") >>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>>>>> "-transactions", ".csv")); >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which >>>>>>>> all the read parameters can be inferred >>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>>>>>>> Moo>readAll() >>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this >>>>>>>> bar...) >>>>>>>> >>>>>>>>> ...etc); >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>>>>> ieme...@gmail.com> wrote: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Hello, >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires context) >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based >>>>>>>> ones. One pattern >>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea >>>>>>>> is to have a different >>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of >>>>>>>> different sorts of >>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, >>>>>>>> for example: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> JdbcIO: >>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> RedisIO: >>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>>> PCollection<KV<String, String>>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>>>>> PCollection<Result>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing >>>>>>>> multiple queries in the same >>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying >>>>>>>> from multiple tables at the >>>>>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms >>>>>>>> the parameters for >>>>>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>>>>> duplicated with methods and >>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into the >>>>>>>> ReadAll transforms. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand the >>>>>>>> input parameters of the >>>>>>>> >>>>>>>>>> intermediary specification into something that >>>>>>>> resembles the full `Read` >>>>>>>> >>>>>>>>>> definition for example imagine you want to read from >>>>>>>> multiple tables or >>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this was not >>>>>>>> in the intermediate >>>>>>>> >>>>>>>>>> specification you end up adding those extra methods >>>>>>>> (duplicating more code) >>>>>>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we end >>>>>>>> up adding them >>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so they >>>>>>>> are taken into account. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a >>>>>>>> new approach that is >>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>>>>> PCollection<Result>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> With this approach users gain benefits of improvements >>>>>>>> on parameters of normal >>>>>>>> >>>>>>>>>> Read because they count with the full Read parameters. >>>>>>>> But of course there are >>>>>>>> >>>>>>>>>> some minor caveats: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> 1. You need to push some information into normal Reads >>>>>>>> for example >>>>>>>> >>>>>>>>>> partition boundaries information or Restriction >>>>>>>> information (in the SDF >>>>>>>> >>>>>>>>>> case). Notice that this consistent approach of >>>>>>>> ReadAll produces a simple >>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable between >>>>>>>> IOs (e.g. the non-SDF >>>>>>>> >>>>>>>>>> case): >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> public static class ReadAll extends >>>>>>>> PTransform<PCollection<Read>, >>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>>>>> >>>>>>>>>> @Override >>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>>>>> expand(PCollection<Read> input) { >>>>>>>> >>>>>>>>>> return input >>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>>> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>>> >>>>>>>>>> } >>>>>>>> >>>>>>>>>> } >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results >>>>>>>> ReadAll you must have the >>>>>>>> >>>>>>>>>> Coders used in its definition and require consistent >>>>>>>> types from the data >>>>>>>> >>>>>>>>>> sources, in practice this means we need to add extra >>>>>>>> withCoder method(s) on >>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this >>>>>>>> ReadAll pattern. RedisIO >>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I >>>>>>>> wanted to bring this subject >>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see >>>>>>>> any sort of issues that >>>>>>>> >>>>>>>>>> we might be missing with this idea. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to start >>>>>>>> using consistently the >>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >>>>>>>> readAll() method for new >>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>>>>>> remaining inconsistent >>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of >>>>>>>> this we should be ok). >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on >>>>>>>> SDF is doing something >>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and >>>>>>>> maybe it is worth to be >>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>> Ismaël >>>>>>>> >> >>>>>>>> >> >>>>>>>> >>>>>>>> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik <lc...@google.com> wrote: >>>>>>>> > >>>>>>>> > I had mentioned that approach 1 and approach 2 work for cross >>>>>>>> language. The difference being that the cross language transform would >>>>>>>> take >>>>>>>> a well known definition and convert it to the Read transform. A normal >>>>>>>> user >>>>>>>> would have a pipeline that would look like: >>>>>>>> > 1: PCollection<Read> -> PTransform(ReadAll) -> PCollection<Output> >>>>>>>> > 2: PCollection<SourceDescriptor> -> PTransform(ReadAll) -> >>>>>>>> PCollection<Output> >>>>>>>> > >>>>>>>> > And in the cross language case this would look like: >>>>>>>> > 1: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row >>>>>>>> to Read) -> PCollection<Read> -> PTransform(ReadAll) -> >>>>>>>> PCollection<Output> >>>>>>>> > 2: PCollection<Row of SourceDescriptor> -> PTransform(Convert Row >>>>>>>> to SourceDescriptor) -> PCollection<SourceDescriptor> -> >>>>>>>> PTransform(ReadAll) -> PCollection<Output>* >>>>>>>> > * note that PTransform(Convert Row to SourceDescriptor) only >>>>>>>> exists since we haven't solved how to use schemas with language bound >>>>>>>> types >>>>>>>> in a cross language way. SchemaCoder isn't portable but RowCoder is >>>>>>>> which >>>>>>>> is why the conversion step exists. We could have a solution for this at >>>>>>>> some point in time. >>>>>>>> > >>>>>>>> > My concern with using Read was around: >>>>>>>> > a) Do all properties set on a Read apply to the ReadAll? For >>>>>>>> example, the Kafka Read implementation allows you to set the key and >>>>>>>> value >>>>>>>> deserializers which are also used to dictate the output PCollection >>>>>>>> type. >>>>>>>> It also allows you to set how the watermark should be computed. >>>>>>>> Technically >>>>>>>> a user may want the watermark computation to be configurable per Read >>>>>>>> and >>>>>>>> they may also want an output type which is polymorphic (e.g. >>>>>>>> PCollection<Serializable>). >>>>>>>> > b) Read extends PTransform which brings its own object modelling >>>>>>>> concerns. >>>>>>>> > >>>>>>>> > During the implementations of ReadAll(PCollection<Read>), was it >>>>>>>> discovered that some properties became runtime errors or were ignored >>>>>>>> if >>>>>>>> they were set? If no, then the code deduplication is likely worth it >>>>>>>> because we also get a lot of javadoc deduplication, but if yes is this >>>>>>>> an >>>>>>>> acceptable user experience? >>>>>>>> > >>>>>>>> > >>>>>>>> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko < >>>>>>>> aromanenko....@gmail.com> wrote: >>>>>>>> >> >>>>>>>> >> I believe that the initial goal of unifying ReadAll as a general >>>>>>>> "PTransform<PCollection<Read>, PCollection<OutputType>>” was to reduce >>>>>>>> the >>>>>>>> amount of code duplication and error-prone approach related to this. It >>>>>>>> makes much sense since usually we have all needed configuration set in >>>>>>>> Read >>>>>>>> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only >>>>>>>> Split-Shuffle-Read stages. So this case usually can be unified by >>>>>>>> using >>>>>>>> PCollection<Read> as input. >>>>>>>> >> >>>>>>>> >> On the other hand, we have another need to use Java IOs as >>>>>>>> cross-language transforms (as Luke described) which seems only partly >>>>>>>> in >>>>>>>> common with previous pattern of ReadAll using. >>>>>>>> >> >>>>>>>> >> I’d be more in favour to have only one concept of read >>>>>>>> configuration for all needs but seems it’s not easy and I’d be more in >>>>>>>> favour with Luke and Boyuan approach with schema. Though, maybe >>>>>>>> ReadAll is >>>>>>>> not a very suitable name in this case because it will can bring some >>>>>>>> confusions related to previous pattern of ReadAll uses. >>>>>>>> >> >>>>>>>> >> On 25 Jun 2020, at 05:00, Boyuan Zhang <boyu...@google.com> >>>>>>>> wrote: >>>>>>>> >> >>>>>>>> >> Sorry for the typo. I mean I think we can go with (3) and (4): >>>>>>>> use the data type that is schema-aware as the input of ReadAll. >>>>>>>> >> >>>>>>>> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <boyu...@google.com> >>>>>>>> wrote: >>>>>>>> >>> >>>>>>>> >>> Thanks for the summary, Cham! >>>>>>>> >>> >>>>>>>> >>> I think we can go with (2) and (4): use the data type that is >>>>>>>> schema-aware as the input of ReadAll. >>>>>>>> >>> >>>>>>>> >>> Converting Read into ReadAll helps us to stick with SDF-like >>>>>>>> IO. But only having (3) is not enough to solve the problem of using >>>>>>>> ReadAll in x-lang case. >>>>>>>> >>> >>>>>>>> >>> The key point of ReadAll is that the input type of ReadAll >>>>>>>> should be able to cross language boundaries and have compatibilities of >>>>>>>> updating/downgrading. After investigating some possibilities(pure java >>>>>>>> pojo >>>>>>>> with custom coder, protobuf, row/schema) in Kafka usage, we find that >>>>>>>> row/schema fits our needs most. Here comes (4). I believe that using >>>>>>>> Read >>>>>>>> as input of ReadAll makes sense in some cases, but I also think not >>>>>>>> all IOs >>>>>>>> have the same need. I would treat Read as a special type as long as the >>>>>>>> Read is schema-aware. >>>>>>>> >>> >>>>>>>> >>> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>> >>>>>>>> >>>> I see. So it seems like there are three options discussed so >>>>>>>> far when it comes to defining source descriptors for ReadAll type >>>>>>>> transforms >>>>>>>> >>>> >>>>>>>> >>>> (1) Use Read PTransform as the element type of the input >>>>>>>> PCollection >>>>>>>> >>>> (2) Use a POJO that describes the source as the data element >>>>>>>> of the input PCollection >>>>>>>> >>>> (3) Provide a converter as a function to the Read transform >>>>>>>> which essentially will convert it to a ReadAll (what Eugene mentioned) >>>>>>>> >>>> >>>>>>>> >>>> I feel like (3) is more suitable for a related set of source >>>>>>>> descriptions such as files. >>>>>>>> >>>> (1) will allow most code-reuse but seems like will make it >>>>>>>> hard to use the ReadAll transform as a cross-language transform and >>>>>>>> will >>>>>>>> break the separation of construction time and runtime constructs >>>>>>>> >>>> (2) could result to less code reuse if not careful but will >>>>>>>> make the transform easier to be used as a cross-language transform >>>>>>>> without >>>>>>>> additional modifications >>>>>>>> >>>> >>>>>>>> >>>> Also, with SDF, we can create ReadAll-like transforms that are >>>>>>>> more efficient. So we might be able to just define all sources in that >>>>>>>> format and make Read transforms just an easy to use composite built on >>>>>>>> top >>>>>>>> of that (by adding a preceding Create transform). >>>>>>>> >>>> >>>>>>>> >>>> Thanks, >>>>>>>> >>>> Cham >>>>>>>> >>>> >>>>>>>> >>>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> I believe we do require PTransforms to be serializable since >>>>>>>> anonymous DoFns typically capture the enclosing PTransform. >>>>>>>> >>>>> >>>>>>>> >>>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>>>> >>>>>>>> >>>>>> Seems like Read in PCollection<Read> refers to a transform, >>>>>>>> at least here: >>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>>>>>> >>>>>> >>>>>>>> >>>>>> I'm in favour of separating construction time transforms >>>>>>>> from execution time data objects that we store in PCollections as Luke >>>>>>>> mentioned. Also, we don't guarantee that PTransform is serializable so >>>>>>>> users have the additional complexity of providing a corder whenever a >>>>>>>> PTransform is used as a data object. >>>>>>>> >>>>>> Also, agree with Boyuan that using simple Java objects that >>>>>>>> are convertible to Beam Rows allow us to make these transforms >>>>>>>> available to >>>>>>>> other SDKs through the cross-language transforms. Using transforms or >>>>>>>> complex sources as data objects will probably make this difficult. >>>>>>>> >>>>>> >>>>>>>> >>>>>> Thanks, >>>>>>>> >>>>>> Cham >>>>>>>> >>>>>> >>>>>>>> >>>>>> >>>>>>>> >>>>>> >>>>>>>> >>>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang < >>>>>>>> boyu...@google.com> wrote: >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Hi Ismael, >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> I think the ReadAll in the IO connector refers to the IO >>>>>>>> with SDF implementation despite the type of input, where Read refers to >>>>>>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>>>>>> description is that not all configurations of KafkaIO.Read are >>>>>>>> meaningful >>>>>>>> to populate during execution time. Also when thinking about x-lang >>>>>>>> useage, >>>>>>>> making source description across language boundaries is also >>>>>>>> necessary. As >>>>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue >>>>>>>> object: >>>>>>>> KafkaSourceDescription.java. Then the coder of this schema-aware object >>>>>>>> will be a SchemaCoder. When crossing language boundaries, it's also >>>>>>>> easy to >>>>>>>> convert a Row into the source description: Convert.fromRows. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <lc...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> To provide additional context, the KafkaIO ReadAll >>>>>>>> transform takes a PCollection<KafkaSourceDescriptor>. This >>>>>>>> KafkaSourceDescriptor is a POJO that contains the configurable >>>>>>>> parameters >>>>>>>> for reading from Kafka. This is different from the pattern that Ismael >>>>>>>> listed because they take PCollection<Read> as input and the Read is the >>>>>>>> same as the Read PTransform class used for the non read all case. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> The KafkaSourceDescriptor does lead to duplication since >>>>>>>> parameters used to configure the transform have to be copied over to >>>>>>>> the >>>>>>>> source descriptor but decouples how a transform is specified from the >>>>>>>> object that describes what needs to be done. I believe Ismael's point >>>>>>>> is >>>>>>>> that we wouldn't need such a decoupling. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Another area that hasn't been discussed and I believe is a >>>>>>>> non-issue is that the Beam Java SDK has the most IO connectors and we >>>>>>>> would >>>>>>>> want to use the IO implementations within Beam Go and Beam Python. This >>>>>>>> brings in its own set of issues related to versioning and >>>>>>>> compatibility for >>>>>>>> the wire format and how one parameterizes such transforms. The wire >>>>>>>> format >>>>>>>> issue can be solved with either approach by making sure that the cross >>>>>>>> language expansion always takes the well known format (whatever it may >>>>>>>> be) >>>>>>>> and converts it into Read/KafkaSourceDescriptor/... object that is then >>>>>>>> passed to the ReadAll transform. Boyuan has been looking to make the >>>>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row >>>>>>>> and >>>>>>>> this can be done easily using the AutoValue integration (I don't >>>>>>>> believe >>>>>>>> there is anything preventing someone from writing a schema row -> Read >>>>>>>> -> >>>>>>>> row adapter or also using the AutoValue configuration if the transform >>>>>>>> is >>>>>>>> also an AutoValue). >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I would be more for the code duplication and separation of >>>>>>>> concerns provided by using a different object to represent the >>>>>>>> contents of >>>>>>>> the PCollection from the pipeline construction time PTransform. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Hi Ismael, >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Thanks for taking this on. Have you considered an >>>>>>>> approach similar (or dual) to FileIO.write(), where we in a sense also >>>>>>>> have >>>>>>>> to configure a dynamic number different IO transforms of the same type >>>>>>>> (file writes)? >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> E.g. how in this example we configure many aspects of >>>>>>>> many file writes: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> transactions.apply(FileIO.<TransactionType, >>>>>>>> Transaction>writeDynamic() >>>>>>>> >>>>>>>>> .by(Transaction::getType) >>>>>>>> >>>>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert >>>>>>>> the data to be written to CSVSink >>>>>>>> >>>>>>>>> type -> new CSVSink(type.getFieldNames())) >>>>>>>> >>>>>>>>> .to(".../path/to/") >>>>>>>> >>>>>>>>> .withNaming(type -> defaultNaming(type + >>>>>>>> "-transactions", ".csv")); >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> we could do something similar for many JdbcIO reads: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> PCollection<Bar> bars; // user-specific type from which >>>>>>>> all the read parameters can be inferred >>>>>>>> >>>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, >>>>>>>> Moo>readAll() >>>>>>>> >>>>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>>>>> >>>>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>>>> >>>>>>>>> .withBatchSize(bar -> ...compute batch size for this >>>>>>>> bar...) >>>>>>>> >>>>>>>>> ...etc); >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía < >>>>>>>> ieme...@gmail.com> wrote: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Hello, >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> (my excuses for the long email but this requires context) >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> As part of the move from Source based IOs to DoFn based >>>>>>>> ones. One pattern >>>>>>>> >>>>>>>>>> emerged due to the composable nature of DoFn. The idea >>>>>>>> is to have a different >>>>>>>> >>>>>>>>>> kind of composable reads where we take a PCollection of >>>>>>>> different sorts of >>>>>>>> >>>>>>>>>> intermediate specifications e.g. tables, queries, etc, >>>>>>>> for example: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> JdbcIO: >>>>>>>> >>>>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>>> >>>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> RedisIO: >>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>>> PCollection<KV<String, String>>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>>>>> PCollection<Result>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> These patterns enabled richer use cases like doing >>>>>>>> multiple queries in the same >>>>>>>> >>>>>>>>>> Pipeline, querying based on key patterns or querying >>>>>>>> from multiple tables at the >>>>>>>> >>>>>>>>>> same time but came with some maintenance issues: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> - We ended up needing to add to the ReadAll transforms >>>>>>>> the parameters for >>>>>>>> >>>>>>>>>> missing information so we ended up with lots of >>>>>>>> duplicated with methods and >>>>>>>> >>>>>>>>>> error-prone code from the Read transforms into the >>>>>>>> ReadAll transforms. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> - When you require new parameters you have to expand the >>>>>>>> input parameters of the >>>>>>>> >>>>>>>>>> intermediary specification into something that >>>>>>>> resembles the full `Read` >>>>>>>> >>>>>>>>>> definition for example imagine you want to read from >>>>>>>> multiple tables or >>>>>>>> >>>>>>>>>> servers as part of the same pipeline but this was not >>>>>>>> in the intermediate >>>>>>>> >>>>>>>>>> specification you end up adding those extra methods >>>>>>>> (duplicating more code) >>>>>>>> >>>>>>>>>> just o get close to the be like the Read full spec. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> - If new parameters are added to the Read method we end >>>>>>>> up adding them >>>>>>>> >>>>>>>>>> systematically to the ReadAll transform too so they >>>>>>>> are taken into account. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Due to these issues I recently did a change to test a >>>>>>>> new approach that is >>>>>>>> >>>>>>>>>> simpler, more complete and maintainable. The code became: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> HBaseIO: >>>>>>>> >>>>>>>>>> ReadAll extends PTransform<PCollection<Read>, >>>>>>>> PCollection<Result>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> With this approach users gain benefits of improvements >>>>>>>> on parameters of normal >>>>>>>> >>>>>>>>>> Read because they count with the full Read parameters. >>>>>>>> But of course there are >>>>>>>> >>>>>>>>>> some minor caveats: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> 1. You need to push some information into normal Reads >>>>>>>> for example >>>>>>>> >>>>>>>>>> partition boundaries information or Restriction >>>>>>>> information (in the SDF >>>>>>>> >>>>>>>>>> case). Notice that this consistent approach of >>>>>>>> ReadAll produces a simple >>>>>>>> >>>>>>>>>> pattern that ends up being almost reusable between >>>>>>>> IOs (e.g. the non-SDF >>>>>>>> >>>>>>>>>> case): >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> public static class ReadAll extends >>>>>>>> PTransform<PCollection<Read>, >>>>>>>> >>>>>>>>>> PCollection<SolrDocument>> { >>>>>>>> >>>>>>>>>> @Override >>>>>>>> >>>>>>>>>> public PCollection<SolrDocument> >>>>>>>> expand(PCollection<Read> input) { >>>>>>>> >>>>>>>>>> return input >>>>>>>> >>>>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>>> >>>>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>>>>>> >>>>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>>> >>>>>>>>>> } >>>>>>>> >>>>>>>>>> } >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> 2. If you are using Generic types for the results >>>>>>>> ReadAll you must have the >>>>>>>> >>>>>>>>>> Coders used in its definition and require consistent >>>>>>>> types from the data >>>>>>>> >>>>>>>>>> sources, in practice this means we need to add extra >>>>>>>> withCoder method(s) on >>>>>>>> >>>>>>>>>> ReadAll but not the full specs. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> At the moment HBaseIO and SolrIO already follow this >>>>>>>> ReadAll pattern. RedisIO >>>>>>>> >>>>>>>>>> and CassandraIO have already WIP PRs to do so. So I >>>>>>>> wanted to bring this subject >>>>>>>> >>>>>>>>>> to the mailing list to see your opinions, and if you see >>>>>>>> any sort of issues that >>>>>>>> >>>>>>>>>> we might be missing with this idea. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Also I would like to see if we have consensus to start >>>>>>>> using consistently the >>>>>>>> >>>>>>>>>> terminology of ReadAll transforms based on Read and the >>>>>>>> readAll() method for new >>>>>>>> >>>>>>>>>> IOs (at this point probably outdoing this in the only >>>>>>>> remaining inconsistent >>>>>>>> >>>>>>>>>> place in JdbcIO might not be a good idea but apart of >>>>>>>> this we should be ok). >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> I mention this because the recent PR on KafkaIO based on >>>>>>>> SDF is doing something >>>>>>>> >>>>>>>>>> similar to the old pattern but being called ReadAll and >>>>>>>> maybe it is worth to be >>>>>>>> >>>>>>>>>> consistent for the benefit of users. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Regards, >>>>>>>> >>>>>>>>>> Ismaël >>>>>>>> >> >>>>>>>> >> >>>>>>>> >>>>>>>>