I believe we do require PTransforms to be serializable since anonymous DoFns typically capture the enclosing PTransform.
On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <[email protected]> wrote: > Seems like Read in PCollection<Read> refers to a transform, at least here: > https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 > > I'm in favour of separating construction time transforms from execution > time data objects that we store in PCollections as Luke mentioned. Also, we > don't guarantee that PTransform is serializable so users have the > additional complexity of providing a corder whenever a PTransform is used > as a data object. > Also, agree with Boyuan that using simple Java objects that are > convertible to Beam Rows allow us to make these transforms available to > other SDKs through the cross-language transforms. Using transforms or > complex sources as data objects will probably make this difficult. > > Thanks, > Cham > > > > On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]> wrote: > >> Hi Ismael, >> >> I think the ReadAll in the IO connector refers to the IO with SDF >> implementation despite the type of input, where Read refers to >> UnboundedSource. One major pushback of using KafkaIO.Read as source >> description is that not all configurations of KafkaIO.Read are meaningful >> to populate during execution time. Also when thinking about x-lang useage, >> making source description across language boundaries is also necessary. As >> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object: >> KafkaSourceDescription.java >> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>. >> Then the coder of this schema-aware object will be a SchemaCoder >> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>. >> When crossing language boundaries, it's also easy to convert a Row into the >> source description: Convert.fromRows >> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480> >> . >> >> >> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote: >> >>> To provide additional context, the KafkaIO ReadAll transform takes a >>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a POJO >>> that contains the configurable parameters for reading from Kafka. This is >>> different from the pattern that Ismael listed because they take >>> PCollection<Read> as input and the Read is the same as the Read PTransform >>> class used for the non read all case. >>> >>> The KafkaSourceDescriptor does lead to duplication since parameters used >>> to configure the transform have to be copied over to the source descriptor >>> but decouples how a transform is specified from the object that describes >>> what needs to be done. I believe Ismael's point is that we wouldn't need >>> such a decoupling. >>> >>> Another area that hasn't been discussed and I believe is a non-issue is >>> that the Beam Java SDK has the most IO connectors and we would want to use >>> the IO implementations within Beam Go and Beam Python. This brings in its >>> own set of issues related to versioning and compatibility for the wire >>> format and how one parameterizes such transforms. The wire format issue can >>> be solved with either approach by making sure that the cross language >>> expansion always takes the well known format (whatever it may be) and >>> converts it into Read/KafkaSourceDescriptor/... object that is then passed >>> to the ReadAll transform. Boyuan has been looking to make the >>> KafkaSourceDescriptor have a schema so it can be represented as a row and >>> this can be done easily using the AutoValue integration (I don't believe >>> there is anything preventing someone from writing a schema row -> Read -> >>> row adapter or also using the AutoValue configuration if the transform is >>> also an AutoValue). >>> >>> I would be more for the code duplication and separation of concerns >>> provided by using a different object to represent the contents of the >>> PCollection from the pipeline construction time PTransform. >>> >>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> Hi Ismael, >>>> >>>> Thanks for taking this on. Have you considered an approach similar (or >>>> dual) to FileIO.write(), where we in a sense also have to configure a >>>> dynamic number different IO transforms of the same type (file writes)? >>>> >>>> E.g. how in this example we configure many aspects of many file writes: >>>> >>>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic() >>>> .by(Transaction::getType) >>>> .via(tx -> tx.getType().toFields(tx), // Convert the data to be >>>> written to CSVSink >>>> type -> new CSVSink(type.getFieldNames())) >>>> .to(".../path/to/") >>>> .withNaming(type -> defaultNaming(type + "-transactions", ".csv")); >>>> >>>> we could do something similar for many JdbcIO reads: >>>> >>>> PCollection<Bar> bars; // user-specific type from which all the read >>>> parameters can be inferred >>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >>>> .fromQuery(bar -> ...compute query for this bar...) >>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>> .withBatchSize(bar -> ...compute batch size for this bar...) >>>> ...etc); >>>> >>>> >>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> wrote: >>>> >>>>> Hello, >>>>> >>>>> (my excuses for the long email but this requires context) >>>>> >>>>> As part of the move from Source based IOs to DoFn based ones. One >>>>> pattern >>>>> emerged due to the composable nature of DoFn. The idea is to have a >>>>> different >>>>> kind of composable reads where we take a PCollection of different >>>>> sorts of >>>>> intermediate specifications e.g. tables, queries, etc, for example: >>>>> >>>>> JdbcIO: >>>>> ReadAll<ParameterT, OutputT> extends >>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>>> >>>>> RedisIO: >>>>> ReadAll extends PTransform<PCollection<String>, PCollection<KV<String, >>>>> String>>> >>>>> >>>>> HBaseIO: >>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>> PCollection<Result>> >>>>> >>>>> These patterns enabled richer use cases like doing multiple queries in >>>>> the same >>>>> Pipeline, querying based on key patterns or querying from multiple >>>>> tables at the >>>>> same time but came with some maintenance issues: >>>>> >>>>> - We ended up needing to add to the ReadAll transforms the parameters >>>>> for >>>>> missing information so we ended up with lots of duplicated with >>>>> methods and >>>>> error-prone code from the Read transforms into the ReadAll >>>>> transforms. >>>>> >>>>> - When you require new parameters you have to expand the input >>>>> parameters of the >>>>> intermediary specification into something that resembles the full >>>>> `Read` >>>>> definition for example imagine you want to read from multiple tables >>>>> or >>>>> servers as part of the same pipeline but this was not in the >>>>> intermediate >>>>> specification you end up adding those extra methods (duplicating >>>>> more code) >>>>> just o get close to the be like the Read full spec. >>>>> >>>>> - If new parameters are added to the Read method we end up adding them >>>>> systematically to the ReadAll transform too so they are taken into >>>>> account. >>>>> >>>>> Due to these issues I recently did a change to test a new approach >>>>> that is >>>>> simpler, more complete and maintainable. The code became: >>>>> >>>>> HBaseIO: >>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>> >>>>> >>>>> With this approach users gain benefits of improvements on parameters >>>>> of normal >>>>> Read because they count with the full Read parameters. But of course >>>>> there are >>>>> some minor caveats: >>>>> >>>>> 1. You need to push some information into normal Reads for example >>>>> partition boundaries information or Restriction information (in the >>>>> SDF >>>>> case). Notice that this consistent approach of ReadAll produces a >>>>> simple >>>>> pattern that ends up being almost reusable between IOs (e.g. the >>>>> non-SDF >>>>> case): >>>>> >>>>> public static class ReadAll extends PTransform<PCollection<Read>, >>>>> PCollection<SolrDocument>> { >>>>> @Override >>>>> public PCollection<SolrDocument> expand(PCollection<Read> input) { >>>>> return input >>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>> } >>>>> } >>>>> >>>>> 2. If you are using Generic types for the results ReadAll you must >>>>> have the >>>>> Coders used in its definition and require consistent types from the >>>>> data >>>>> sources, in practice this means we need to add extra withCoder >>>>> method(s) on >>>>> ReadAll but not the full specs. >>>>> >>>>> >>>>> At the moment HBaseIO and SolrIO already follow this ReadAll pattern. >>>>> RedisIO >>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring >>>>> this subject >>>>> to the mailing list to see your opinions, and if you see any sort of >>>>> issues that >>>>> we might be missing with this idea. >>>>> >>>>> Also I would like to see if we have consensus to start using >>>>> consistently the >>>>> terminology of ReadAll transforms based on Read and the readAll() >>>>> method for new >>>>> IOs (at this point probably outdoing this in the only remaining >>>>> inconsistent >>>>> place in JdbcIO might not be a good idea but apart of this we should >>>>> be ok). >>>>> >>>>> I mention this because the recent PR on KafkaIO based on SDF is doing >>>>> something >>>>> similar to the old pattern but being called ReadAll and maybe it is >>>>> worth to be >>>>> consistent for the benefit of users. >>>>> >>>>> Regards, >>>>> Ismaël >>>>> >>>>
