Thanks for the summary, Cham! I think we can go with (2) and (4): use the data type that is schema-aware as the input of ReadAll.
Converting Read into ReadAll helps us to stick with SDF-like IO. But only having (3) is not enough to solve the problem of using ReadAll in x-lang case. The key point of ReadAll is that the input type of ReadAll should be able to cross language boundaries and have compatibilities of updating/downgrading. After investigating some possibilities(pure java pojo with custom coder, protobuf, row/schema) in Kafka usage, we find that row/schema fits our needs most. Here comes (4). I believe that using Read as input of ReadAll makes sense in some cases, but I also think not all IOs have the same need. I would treat Read as a special type as long as the Read is schema-aware. On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <[email protected]> wrote: > I see. So it seems like there are three options discussed so far when it > comes to defining source descriptors for ReadAll type transforms > > (1) Use Read PTransform as the element type of the input PCollection > (2) Use a POJO that describes the source as the data element of the input > PCollection > (3) Provide a converter as a function to the Read transform which > essentially will convert it to a ReadAll (what Eugene mentioned) > > I feel like (3) is more suitable for a related set of source descriptions > such as files. > (1) will allow most code-reuse but seems like will make it hard to use the > ReadAll transform as a cross-language transform and will break the > separation of construction time and runtime constructs > (2) could result to less code reuse if not careful but will make the > transform easier to be used as a cross-language transform without > additional modifications > > Also, with SDF, we can create ReadAll-like transforms that are more > efficient. So we might be able to just define all sources in that format > and make Read transforms just an easy to use composite built on top of that > (by adding a preceding Create transform). > > Thanks, > Cham > > On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> wrote: > >> I believe we do require PTransforms to be serializable since anonymous >> DoFns typically capture the enclosing PTransform. >> >> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <[email protected]> >> wrote: >> >>> Seems like Read in PCollection<Read> refers to a transform, at least >>> here: >>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>> >>> I'm in favour of separating construction time transforms from execution >>> time data objects that we store in PCollections as Luke mentioned. Also, we >>> don't guarantee that PTransform is serializable so users have the >>> additional complexity of providing a corder whenever a PTransform is used >>> as a data object. >>> Also, agree with Boyuan that using simple Java objects that are >>> convertible to Beam Rows allow us to make these transforms available to >>> other SDKs through the cross-language transforms. Using transforms or >>> complex sources as data objects will probably make this difficult. >>> >>> Thanks, >>> Cham >>> >>> >>> >>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]> >>> wrote: >>> >>>> Hi Ismael, >>>> >>>> I think the ReadAll in the IO connector refers to the IO with SDF >>>> implementation despite the type of input, where Read refers to >>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>> description is that not all configurations of KafkaIO.Read are meaningful >>>> to populate during execution time. Also when thinking about x-lang useage, >>>> making source description across language boundaries is also necessary. As >>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: >>>> KafkaSourceDescription.java >>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>. >>>> Then the coder of this schema-aware object will be a SchemaCoder >>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>. >>>> When crossing language boundaries, it's also easy to convert a Row into the >>>> source description: Convert.fromRows >>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480> >>>> . >>>> >>>> >>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote: >>>> >>>>> To provide additional context, the KafkaIO ReadAll transform takes a >>>>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a POJO >>>>> that contains the configurable parameters for reading from Kafka. This is >>>>> different from the pattern that Ismael listed because they take >>>>> PCollection<Read> as input and the Read is the same as the Read PTransform >>>>> class used for the non read all case. >>>>> >>>>> The KafkaSourceDescriptor does lead to duplication since parameters >>>>> used to configure the transform have to be copied over to the source >>>>> descriptor but decouples how a transform is specified from the object that >>>>> describes what needs to be done. I believe Ismael's point is that we >>>>> wouldn't need such a decoupling. >>>>> >>>>> Another area that hasn't been discussed and I believe is a non-issue >>>>> is that the Beam Java SDK has the most IO connectors and we would want to >>>>> use the IO implementations within Beam Go and Beam Python. This brings in >>>>> its own set of issues related to versioning and compatibility for the wire >>>>> format and how one parameterizes such transforms. The wire format issue >>>>> can >>>>> be solved with either approach by making sure that the cross language >>>>> expansion always takes the well known format (whatever it may be) and >>>>> converts it into Read/KafkaSourceDescriptor/... object that is then passed >>>>> to the ReadAll transform. Boyuan has been looking to make the >>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and >>>>> this can be done easily using the AutoValue integration (I don't believe >>>>> there is anything preventing someone from writing a schema row -> Read -> >>>>> row adapter or also using the AutoValue configuration if the transform is >>>>> also an AutoValue). >>>>> >>>>> I would be more for the code duplication and separation of concerns >>>>> provided by using a different object to represent the contents of the >>>>> PCollection from the pipeline construction time PTransform. >>>>> >>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Ismael, >>>>>> >>>>>> Thanks for taking this on. Have you considered an approach similar >>>>>> (or dual) to FileIO.write(), where we in a sense also have to configure a >>>>>> dynamic number different IO transforms of the same type (file writes)? >>>>>> >>>>>> E.g. how in this example we configure many aspects of many file >>>>>> writes: >>>>>> >>>>>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic() >>>>>> .by(Transaction::getType) >>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the data to be >>>>>> written to CSVSink >>>>>> type -> new CSVSink(type.getFieldNames())) >>>>>> .to(".../path/to/") >>>>>> .withNaming(type -> defaultNaming(type + "-transactions", >>>>>> ".csv")); >>>>>> >>>>>> we could do something similar for many JdbcIO reads: >>>>>> >>>>>> PCollection<Bar> bars; // user-specific type from which all the read >>>>>> parameters can be inferred >>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) >>>>>> ...etc); >>>>>> >>>>>> >>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> (my excuses for the long email but this requires context) >>>>>>> >>>>>>> As part of the move from Source based IOs to DoFn based ones. One >>>>>>> pattern >>>>>>> emerged due to the composable nature of DoFn. The idea is to have a >>>>>>> different >>>>>>> kind of composable reads where we take a PCollection of different >>>>>>> sorts of >>>>>>> intermediate specifications e.g. tables, queries, etc, for example: >>>>>>> >>>>>>> JdbcIO: >>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>>>>> >>>>>>> RedisIO: >>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>> PCollection<KV<String, String>>> >>>>>>> >>>>>>> HBaseIO: >>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>>>> PCollection<Result>> >>>>>>> >>>>>>> These patterns enabled richer use cases like doing multiple queries >>>>>>> in the same >>>>>>> Pipeline, querying based on key patterns or querying from multiple >>>>>>> tables at the >>>>>>> same time but came with some maintenance issues: >>>>>>> >>>>>>> - We ended up needing to add to the ReadAll transforms the >>>>>>> parameters for >>>>>>> missing information so we ended up with lots of duplicated with >>>>>>> methods and >>>>>>> error-prone code from the Read transforms into the ReadAll >>>>>>> transforms. >>>>>>> >>>>>>> - When you require new parameters you have to expand the input >>>>>>> parameters of the >>>>>>> intermediary specification into something that resembles the full >>>>>>> `Read` >>>>>>> definition for example imagine you want to read from multiple >>>>>>> tables or >>>>>>> servers as part of the same pipeline but this was not in the >>>>>>> intermediate >>>>>>> specification you end up adding those extra methods (duplicating >>>>>>> more code) >>>>>>> just o get close to the be like the Read full spec. >>>>>>> >>>>>>> - If new parameters are added to the Read method we end up adding >>>>>>> them >>>>>>> systematically to the ReadAll transform too so they are taken into >>>>>>> account. >>>>>>> >>>>>>> Due to these issues I recently did a change to test a new approach >>>>>>> that is >>>>>>> simpler, more complete and maintainable. The code became: >>>>>>> >>>>>>> HBaseIO: >>>>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>> >>>>>>> >>>>>>> With this approach users gain benefits of improvements on parameters >>>>>>> of normal >>>>>>> Read because they count with the full Read parameters. But of course >>>>>>> there are >>>>>>> some minor caveats: >>>>>>> >>>>>>> 1. You need to push some information into normal Reads for example >>>>>>> partition boundaries information or Restriction information (in >>>>>>> the SDF >>>>>>> case). Notice that this consistent approach of ReadAll produces >>>>>>> a simple >>>>>>> pattern that ends up being almost reusable between IOs (e.g. the >>>>>>> non-SDF >>>>>>> case): >>>>>>> >>>>>>> public static class ReadAll extends PTransform<PCollection<Read>, >>>>>>> PCollection<SolrDocument>> { >>>>>>> @Override >>>>>>> public PCollection<SolrDocument> expand(PCollection<Read> input) >>>>>>> { >>>>>>> return input >>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> 2. If you are using Generic types for the results ReadAll you must >>>>>>> have the >>>>>>> Coders used in its definition and require consistent types from >>>>>>> the data >>>>>>> sources, in practice this means we need to add extra withCoder >>>>>>> method(s) on >>>>>>> ReadAll but not the full specs. >>>>>>> >>>>>>> >>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll >>>>>>> pattern. RedisIO >>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring >>>>>>> this subject >>>>>>> to the mailing list to see your opinions, and if you see any sort of >>>>>>> issues that >>>>>>> we might be missing with this idea. >>>>>>> >>>>>>> Also I would like to see if we have consensus to start using >>>>>>> consistently the >>>>>>> terminology of ReadAll transforms based on Read and the readAll() >>>>>>> method for new >>>>>>> IOs (at this point probably outdoing this in the only remaining >>>>>>> inconsistent >>>>>>> place in JdbcIO might not be a good idea but apart of this we should >>>>>>> be ok). >>>>>>> >>>>>>> I mention this because the recent PR on KafkaIO based on SDF is >>>>>>> doing something >>>>>>> similar to the old pattern but being called ReadAll and maybe it is >>>>>>> worth to be >>>>>>> consistent for the benefit of users. >>>>>>> >>>>>>> Regards, >>>>>>> Ismaël >>>>>>> >>>>>>
