I believe we do require PTransforms to be serializable since anonymous
DoFns typically capture the enclosing PTransform.

On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <[email protected]>
wrote:

> Seems like Read in PCollection<Read> refers to a transform, at least here:
> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>
> I'm in favour of separating construction time transforms from execution
> time data objects that we store in PCollections as Luke mentioned. Also, we
> don't guarantee that PTransform is serializable so users have the
> additional complexity of providing a corder whenever a PTransform is used
> as a data object.
> Also, agree with Boyuan that using simple Java objects that are
> convertible to Beam Rows allow us to make these transforms available to
> other SDKs through the cross-language transforms. Using transforms or
> complex sources as data objects will probably make this difficult.
>
> Thanks,
> Cham
>
>
>
> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]> wrote:
>
>> Hi Ismael,
>>
>> I think the ReadAll in the IO connector refers to the IO with SDF
>> implementation despite the type of input, where Read refers to
>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>> description is that not all configurations of KafkaIO.Read are meaningful
>> to populate during execution time. Also when thinking about x-lang useage,
>> making source description across language boundaries is also necessary.  As
>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>> KafkaSourceDescription.java
>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>.
>> Then the coder of this schema-aware object will be a SchemaCoder
>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>.
>> When crossing language boundaries, it's also easy to convert a Row into the
>> source description: Convert.fromRows
>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480>
>> .
>>
>>
>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote:
>>
>>> To provide additional context, the KafkaIO ReadAll transform takes a
>>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a POJO
>>> that contains the configurable parameters for reading from Kafka. This is
>>> different from the pattern that Ismael listed because they take
>>> PCollection<Read> as input and the Read is the same as the Read PTransform
>>> class used for the non read all case.
>>>
>>> The KafkaSourceDescriptor does lead to duplication since parameters used
>>> to configure the transform have to be copied over to the source descriptor
>>> but decouples how a transform is specified from the object that describes
>>> what needs to be done. I believe Ismael's point is that we wouldn't need
>>> such a decoupling.
>>>
>>> Another area that hasn't been discussed and I believe is a non-issue is
>>> that the Beam Java SDK has the most IO connectors and we would want to use
>>> the IO implementations within Beam Go and Beam Python. This brings in its
>>> own set of issues related to versioning and compatibility for the wire
>>> format and how one parameterizes such transforms. The wire format issue can
>>> be solved with either approach by making sure that the cross language
>>> expansion always takes the well known format (whatever it may be) and
>>> converts it into Read/KafkaSourceDescriptor/... object that is then passed
>>> to the ReadAll transform. Boyuan has been looking to make the
>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>> this can be done easily using the AutoValue integration (I don't believe
>>> there is anything preventing someone from writing a schema row -> Read ->
>>> row adapter or also using the AutoValue configuration if the transform is
>>> also an AutoValue).
>>>
>>> I would be more for the code duplication and separation of concerns
>>> provided by using a different object to represent the contents of the
>>> PCollection from the pipeline construction time PTransform.
>>>
>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> Hi Ismael,
>>>>
>>>> Thanks for taking this on. Have you considered an approach similar (or
>>>> dual) to FileIO.write(), where we in a sense also have to configure a
>>>> dynamic number different IO transforms of the same type (file writes)?
>>>>
>>>> E.g. how in this example we configure many aspects of many file writes:
>>>>
>>>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>>>      .by(Transaction::getType)
>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
>>>> written to CSVSink
>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>      .to(".../path/to/")
>>>>      .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>>>>
>>>> we could do something similar for many JdbcIO reads:
>>>>
>>>> PCollection<Bar> bars;  // user-specific type from which all the read
>>>> parameters can be inferred
>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>>>   ...etc);
>>>>
>>>>
>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> (my excuses for the long email but this requires context)
>>>>>
>>>>> As part of the move from Source based IOs to DoFn based ones. One
>>>>> pattern
>>>>> emerged due to the composable nature of DoFn. The idea is to have a
>>>>> different
>>>>> kind of composable reads where we take a PCollection of different
>>>>> sorts of
>>>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>>>
>>>>> JdbcIO:
>>>>> ReadAll<ParameterT, OutputT> extends
>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>>
>>>>> RedisIO:
>>>>> ReadAll extends PTransform<PCollection<String>, PCollection<KV<String,
>>>>> String>>>
>>>>>
>>>>> HBaseIO:
>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>> PCollection<Result>>
>>>>>
>>>>> These patterns enabled richer use cases like doing multiple queries in
>>>>> the same
>>>>> Pipeline, querying based on key patterns or querying from multiple
>>>>> tables at the
>>>>> same time but came with some maintenance issues:
>>>>>
>>>>> - We ended up needing to add to the ReadAll transforms the parameters
>>>>> for
>>>>>   missing information so we ended up with lots of duplicated with
>>>>> methods and
>>>>>   error-prone code from the Read transforms into the ReadAll
>>>>> transforms.
>>>>>
>>>>> - When you require new parameters you have to expand the input
>>>>> parameters of the
>>>>>   intermediary specification into something that resembles the full
>>>>> `Read`
>>>>>   definition for example imagine you want to read from multiple tables
>>>>> or
>>>>>   servers as part of the same pipeline but this was not in the
>>>>> intermediate
>>>>>   specification you end up adding those extra methods (duplicating
>>>>> more code)
>>>>>   just o get close to the be like the Read full spec.
>>>>>
>>>>> - If new parameters are added to the Read method we end up adding them
>>>>>   systematically to the ReadAll transform too so they are taken into
>>>>> account.
>>>>>
>>>>> Due to these issues I recently did a change to test a new approach
>>>>> that is
>>>>> simpler, more complete and maintainable. The code became:
>>>>>
>>>>> HBaseIO:
>>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>>>>>
>>>>> With this approach users gain benefits of improvements on parameters
>>>>> of normal
>>>>> Read because they count with the full Read parameters. But of course
>>>>> there are
>>>>> some minor caveats:
>>>>>
>>>>> 1. You need to push some information into normal Reads for example
>>>>>    partition boundaries information or Restriction information (in the
>>>>> SDF
>>>>>    case).  Notice that this consistent approach of ReadAll produces a
>>>>> simple
>>>>>    pattern that ends up being almost reusable between IOs (e.g. the
>>>>> non-SDF
>>>>>    case):
>>>>>
>>>>>   public static class ReadAll extends PTransform<PCollection<Read>,
>>>>> PCollection<SolrDocument>> {
>>>>>     @Override
>>>>>     public PCollection<SolrDocument> expand(PCollection<Read> input) {
>>>>>       return input
>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>     }
>>>>>   }
>>>>>
>>>>> 2. If you are using Generic types for the results ReadAll you must
>>>>> have the
>>>>>    Coders used in its definition and require consistent types from the
>>>>> data
>>>>>    sources, in practice this means we need to add extra withCoder
>>>>> method(s) on
>>>>>    ReadAll but not the full specs.
>>>>>
>>>>>
>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll pattern.
>>>>> RedisIO
>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring
>>>>> this subject
>>>>> to the mailing list to see your opinions, and if you see any sort of
>>>>> issues that
>>>>> we might be missing with this idea.
>>>>>
>>>>> Also I would like to see if we have consensus to start using
>>>>> consistently the
>>>>> terminology of ReadAll transforms based on Read and the readAll()
>>>>> method for new
>>>>> IOs (at this point probably outdoing this in the only remaining
>>>>> inconsistent
>>>>> place in JdbcIO might not be a good idea but apart of this we should
>>>>> be ok).
>>>>>
>>>>> I mention this because the recent PR on KafkaIO based on SDF is doing
>>>>> something
>>>>> similar to the old pattern but being called ReadAll and maybe it is
>>>>> worth to be
>>>>> consistent for the benefit of users.
>>>>>
>>>>> Regards,
>>>>> Ismaël
>>>>>
>>>>

Reply via email to