Sorry for the typo. I mean I think we can go with *(3)* and (4): use the data type that is schema-aware as the input of ReadAll.
On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> wrote: > Thanks for the summary, Cham! > > I think we can go with (2) and (4): use the data type that is schema-aware > as the input of ReadAll. > > Converting Read into ReadAll helps us to stick with SDF-like IO. But only > having (3) is not enough to solve the problem of using ReadAll in x-lang > case. > > The key point of ReadAll is that the input type of ReadAll should be able > to cross language boundaries and have compatibilities of > updating/downgrading. After investigating some possibilities(pure java pojo > with custom coder, protobuf, row/schema) in Kafka usage, we find that > row/schema fits our needs most. Here comes (4). I believe that using Read > as input of ReadAll makes sense in some cases, but I also think not all IOs > have the same need. I would treat Read as a special type as long as the > Read is schema-aware. > > On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <[email protected]> > wrote: > >> I see. So it seems like there are three options discussed so far when it >> comes to defining source descriptors for ReadAll type transforms >> >> (1) Use Read PTransform as the element type of the input PCollection >> (2) Use a POJO that describes the source as the data element of the input >> PCollection >> (3) Provide a converter as a function to the Read transform which >> essentially will convert it to a ReadAll (what Eugene mentioned) >> >> I feel like (3) is more suitable for a related set of source descriptions >> such as files. >> (1) will allow most code-reuse but seems like will make it hard to use >> the ReadAll transform as a cross-language transform and will break the >> separation of construction time and runtime constructs >> (2) could result to less code reuse if not careful but will make the >> transform easier to be used as a cross-language transform without >> additional modifications >> >> Also, with SDF, we can create ReadAll-like transforms that are more >> efficient. So we might be able to just define all sources in that format >> and make Read transforms just an easy to use composite built on top of that >> (by adding a preceding Create transform). >> >> Thanks, >> Cham >> >> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> wrote: >> >>> I believe we do require PTransforms to be serializable since anonymous >>> DoFns typically capture the enclosing PTransform. >>> >>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath < >>> [email protected]> wrote: >>> >>>> Seems like Read in PCollection<Read> refers to a transform, at least >>>> here: >>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353 >>>> >>>> I'm in favour of separating construction time transforms from execution >>>> time data objects that we store in PCollections as Luke mentioned. Also, we >>>> don't guarantee that PTransform is serializable so users have the >>>> additional complexity of providing a corder whenever a PTransform is used >>>> as a data object. >>>> Also, agree with Boyuan that using simple Java objects that are >>>> convertible to Beam Rows allow us to make these transforms available to >>>> other SDKs through the cross-language transforms. Using transforms or >>>> complex sources as data objects will probably make this difficult. >>>> >>>> Thanks, >>>> Cham >>>> >>>> >>>> >>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> Hi Ismael, >>>>> >>>>> I think the ReadAll in the IO connector refers to the IO with SDF >>>>> implementation despite the type of input, where Read refers to >>>>> UnboundedSource. One major pushback of using KafkaIO.Read as source >>>>> description is that not all configurations of KafkaIO.Read are meaningful >>>>> to populate during execution time. Also when thinking about x-lang useage, >>>>> making source description across language boundaries is also necessary. >>>>> As >>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue >>>>> object: >>>>> KafkaSourceDescription.java >>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>. >>>>> Then the coder of this schema-aware object will be a SchemaCoder >>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>. >>>>> When crossing language boundaries, it's also easy to convert a Row into >>>>> the >>>>> source description: Convert.fromRows >>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480> >>>>> . >>>>> >>>>> >>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote: >>>>> >>>>>> To provide additional context, the KafkaIO ReadAll transform takes a >>>>>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a POJO >>>>>> that contains the configurable parameters for reading from Kafka. This is >>>>>> different from the pattern that Ismael listed because they take >>>>>> PCollection<Read> as input and the Read is the same as the Read >>>>>> PTransform >>>>>> class used for the non read all case. >>>>>> >>>>>> The KafkaSourceDescriptor does lead to duplication since parameters >>>>>> used to configure the transform have to be copied over to the source >>>>>> descriptor but decouples how a transform is specified from the object >>>>>> that >>>>>> describes what needs to be done. I believe Ismael's point is that we >>>>>> wouldn't need such a decoupling. >>>>>> >>>>>> Another area that hasn't been discussed and I believe is a non-issue >>>>>> is that the Beam Java SDK has the most IO connectors and we would want to >>>>>> use the IO implementations within Beam Go and Beam Python. This brings in >>>>>> its own set of issues related to versioning and compatibility for the >>>>>> wire >>>>>> format and how one parameterizes such transforms. The wire format issue >>>>>> can >>>>>> be solved with either approach by making sure that the cross language >>>>>> expansion always takes the well known format (whatever it may be) and >>>>>> converts it into Read/KafkaSourceDescriptor/... object that is then >>>>>> passed >>>>>> to the ReadAll transform. Boyuan has been looking to make the >>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and >>>>>> this can be done easily using the AutoValue integration (I don't believe >>>>>> there is anything preventing someone from writing a schema row -> Read -> >>>>>> row adapter or also using the AutoValue configuration if the transform is >>>>>> also an AutoValue). >>>>>> >>>>>> I would be more for the code duplication and separation of concerns >>>>>> provided by using a different object to represent the contents of the >>>>>> PCollection from the pipeline construction time PTransform. >>>>>> >>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Ismael, >>>>>>> >>>>>>> Thanks for taking this on. Have you considered an approach similar >>>>>>> (or dual) to FileIO.write(), where we in a sense also have to configure >>>>>>> a >>>>>>> dynamic number different IO transforms of the same type (file writes)? >>>>>>> >>>>>>> E.g. how in this example we configure many aspects of many file >>>>>>> writes: >>>>>>> >>>>>>> transactions.apply(FileIO.<TransactionType, >>>>>>> Transaction>writeDynamic() >>>>>>> .by(Transaction::getType) >>>>>>> .via(tx -> tx.getType().toFields(tx), // Convert the data to >>>>>>> be written to CSVSink >>>>>>> type -> new CSVSink(type.getFieldNames())) >>>>>>> .to(".../path/to/") >>>>>>> .withNaming(type -> defaultNaming(type + "-transactions", >>>>>>> ".csv")); >>>>>>> >>>>>>> we could do something similar for many JdbcIO reads: >>>>>>> >>>>>>> PCollection<Bar> bars; // user-specific type from which all the >>>>>>> read parameters can be inferred >>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll() >>>>>>> .fromQuery(bar -> ...compute query for this bar...) >>>>>>> .withMapper((bar, resultSet) -> new Moo(...)) >>>>>>> .withBatchSize(bar -> ...compute batch size for this bar...) >>>>>>> ...etc); >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> (my excuses for the long email but this requires context) >>>>>>>> >>>>>>>> As part of the move from Source based IOs to DoFn based ones. One >>>>>>>> pattern >>>>>>>> emerged due to the composable nature of DoFn. The idea is to have a >>>>>>>> different >>>>>>>> kind of composable reads where we take a PCollection of different >>>>>>>> sorts of >>>>>>>> intermediate specifications e.g. tables, queries, etc, for example: >>>>>>>> >>>>>>>> JdbcIO: >>>>>>>> ReadAll<ParameterT, OutputT> extends >>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>> >>>>>>>> >>>>>>>> RedisIO: >>>>>>>> ReadAll extends PTransform<PCollection<String>, >>>>>>>> PCollection<KV<String, String>>> >>>>>>>> >>>>>>>> HBaseIO: >>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>, >>>>>>>> PCollection<Result>> >>>>>>>> >>>>>>>> These patterns enabled richer use cases like doing multiple queries >>>>>>>> in the same >>>>>>>> Pipeline, querying based on key patterns or querying from multiple >>>>>>>> tables at the >>>>>>>> same time but came with some maintenance issues: >>>>>>>> >>>>>>>> - We ended up needing to add to the ReadAll transforms the >>>>>>>> parameters for >>>>>>>> missing information so we ended up with lots of duplicated with >>>>>>>> methods and >>>>>>>> error-prone code from the Read transforms into the ReadAll >>>>>>>> transforms. >>>>>>>> >>>>>>>> - When you require new parameters you have to expand the input >>>>>>>> parameters of the >>>>>>>> intermediary specification into something that resembles the full >>>>>>>> `Read` >>>>>>>> definition for example imagine you want to read from multiple >>>>>>>> tables or >>>>>>>> servers as part of the same pipeline but this was not in the >>>>>>>> intermediate >>>>>>>> specification you end up adding those extra methods (duplicating >>>>>>>> more code) >>>>>>>> just o get close to the be like the Read full spec. >>>>>>>> >>>>>>>> - If new parameters are added to the Read method we end up adding >>>>>>>> them >>>>>>>> systematically to the ReadAll transform too so they are taken >>>>>>>> into account. >>>>>>>> >>>>>>>> Due to these issues I recently did a change to test a new approach >>>>>>>> that is >>>>>>>> simpler, more complete and maintainable. The code became: >>>>>>>> >>>>>>>> HBaseIO: >>>>>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>> >>>>>>>> >>>>>>>> With this approach users gain benefits of improvements on >>>>>>>> parameters of normal >>>>>>>> Read because they count with the full Read parameters. But of >>>>>>>> course there are >>>>>>>> some minor caveats: >>>>>>>> >>>>>>>> 1. You need to push some information into normal Reads for example >>>>>>>> partition boundaries information or Restriction information (in >>>>>>>> the SDF >>>>>>>> case). Notice that this consistent approach of ReadAll produces >>>>>>>> a simple >>>>>>>> pattern that ends up being almost reusable between IOs (e.g. >>>>>>>> the non-SDF >>>>>>>> case): >>>>>>>> >>>>>>>> public static class ReadAll extends PTransform<PCollection<Read>, >>>>>>>> PCollection<SolrDocument>> { >>>>>>>> @Override >>>>>>>> public PCollection<SolrDocument> expand(PCollection<Read> >>>>>>>> input) { >>>>>>>> return input >>>>>>>> .apply("Split", ParDo.of(new SplitFn())) >>>>>>>> .apply("Reshuffle", Reshuffle.viaRandomKey()) >>>>>>>> .apply("Read", ParDo.of(new ReadFn())); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> 2. If you are using Generic types for the results ReadAll you must >>>>>>>> have the >>>>>>>> Coders used in its definition and require consistent types from >>>>>>>> the data >>>>>>>> sources, in practice this means we need to add extra withCoder >>>>>>>> method(s) on >>>>>>>> ReadAll but not the full specs. >>>>>>>> >>>>>>>> >>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll >>>>>>>> pattern. RedisIO >>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring >>>>>>>> this subject >>>>>>>> to the mailing list to see your opinions, and if you see any sort >>>>>>>> of issues that >>>>>>>> we might be missing with this idea. >>>>>>>> >>>>>>>> Also I would like to see if we have consensus to start using >>>>>>>> consistently the >>>>>>>> terminology of ReadAll transforms based on Read and the readAll() >>>>>>>> method for new >>>>>>>> IOs (at this point probably outdoing this in the only remaining >>>>>>>> inconsistent >>>>>>>> place in JdbcIO might not be a good idea but apart of this we >>>>>>>> should be ok). >>>>>>>> >>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is >>>>>>>> doing something >>>>>>>> similar to the old pattern but being called ReadAll and maybe it is >>>>>>>> worth to be >>>>>>>> consistent for the benefit of users. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Ismaël >>>>>>>> >>>>>>>
