Thanks for the summary, Cham!

I think we can go with (2) and (4): use the data type that is schema-aware
as the input of ReadAll.

Converting Read into ReadAll helps us to stick with SDF-like IO. But only
having  (3) is not enough to solve the problem of using ReadAll in x-lang
case.

The key point of ReadAll is that the input type of ReadAll should be able
to cross language boundaries and have compatibilities of
updating/downgrading. After investigating some possibilities(pure java pojo
with custom coder, protobuf, row/schema) in Kafka usage, we find that
row/schema fits our needs most. Here comes (4). I believe that using Read
as input of ReadAll makes sense in some cases, but I also think not all IOs
have the same need. I would treat Read as a special type as long as the
Read is schema-aware.

On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <[email protected]>
wrote:

> I see. So it seems like there are three options discussed so far when it
> comes to defining source descriptors for ReadAll type transforms
>
> (1) Use Read PTransform as the element type of the input PCollection
> (2) Use a POJO that describes the source as the data element of the input
> PCollection
> (3) Provide a converter as a function to the Read transform which
> essentially will convert it to a ReadAll (what Eugene mentioned)
>
> I feel like (3) is more suitable for a related set of source descriptions
> such as files.
> (1) will allow most code-reuse but seems like will make it hard to use the
> ReadAll transform as a cross-language transform and will break the
> separation of construction time and runtime constructs
> (2) could result to less code reuse if not careful but will make the
> transform easier to be used as a cross-language transform without
> additional modifications
>
> Also, with SDF, we can create ReadAll-like transforms that are more
> efficient. So we might be able to just define all sources in that format
> and make Read transforms just an easy to use composite built on top of that
> (by adding a preceding Create transform).
>
> Thanks,
> Cham
>
> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> wrote:
>
>> I believe we do require PTransforms to be serializable since anonymous
>> DoFns typically capture the enclosing PTransform.
>>
>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> Seems like Read in PCollection<Read> refers to a transform, at least
>>> here:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>
>>> I'm in favour of separating construction time transforms from execution
>>> time data objects that we store in PCollections as Luke mentioned. Also, we
>>> don't guarantee that PTransform is serializable so users have the
>>> additional complexity of providing a corder whenever a PTransform is used
>>> as a data object.
>>> Also, agree with Boyuan that using simple Java objects that are
>>> convertible to Beam Rows allow us to make these transforms available to
>>> other SDKs through the cross-language transforms. Using transforms or
>>> complex sources as data objects will probably make this difficult.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]>
>>> wrote:
>>>
>>>> Hi Ismael,
>>>>
>>>> I think the ReadAll in the IO connector refers to the IO with SDF
>>>> implementation despite the type of input, where Read refers to
>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>> making source description across language boundaries is also necessary.  As
>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>>>> KafkaSourceDescription.java
>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>.
>>>> Then the coder of this schema-aware object will be a SchemaCoder
>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>.
>>>> When crossing language boundaries, it's also easy to convert a Row into the
>>>> source description: Convert.fromRows
>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480>
>>>> .
>>>>
>>>>
>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> To provide additional context, the KafkaIO ReadAll transform takes a
>>>>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a POJO
>>>>> that contains the configurable parameters for reading from Kafka. This is
>>>>> different from the pattern that Ismael listed because they take
>>>>> PCollection<Read> as input and the Read is the same as the Read PTransform
>>>>> class used for the non read all case.
>>>>>
>>>>> The KafkaSourceDescriptor does lead to duplication since parameters
>>>>> used to configure the transform have to be copied over to the source
>>>>> descriptor but decouples how a transform is specified from the object that
>>>>> describes what needs to be done. I believe Ismael's point is that we
>>>>> wouldn't need such a decoupling.
>>>>>
>>>>> Another area that hasn't been discussed and I believe is a non-issue
>>>>> is that the Beam Java SDK has the most IO connectors and we would want to
>>>>> use the IO implementations within Beam Go and Beam Python. This brings in
>>>>> its own set of issues related to versioning and compatibility for the wire
>>>>> format and how one parameterizes such transforms. The wire format issue 
>>>>> can
>>>>> be solved with either approach by making sure that the cross language
>>>>> expansion always takes the well known format (whatever it may be) and
>>>>> converts it into Read/KafkaSourceDescriptor/... object that is then passed
>>>>> to the ReadAll transform. Boyuan has been looking to make the
>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>>>> this can be done easily using the AutoValue integration (I don't believe
>>>>> there is anything preventing someone from writing a schema row -> Read ->
>>>>> row adapter or also using the AutoValue configuration if the transform is
>>>>> also an AutoValue).
>>>>>
>>>>> I would be more for the code duplication and separation of concerns
>>>>> provided by using a different object to represent the contents of the
>>>>> PCollection from the pipeline construction time PTransform.
>>>>>
>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Ismael,
>>>>>>
>>>>>> Thanks for taking this on. Have you considered an approach similar
>>>>>> (or dual) to FileIO.write(), where we in a sense also have to configure a
>>>>>> dynamic number different IO transforms of the same type (file writes)?
>>>>>>
>>>>>> E.g. how in this example we configure many aspects of many file
>>>>>> writes:
>>>>>>
>>>>>> transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
>>>>>>      .by(Transaction::getType)
>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
>>>>>> written to CSVSink
>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>      .to(".../path/to/")
>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions",
>>>>>> ".csv"));
>>>>>>
>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>
>>>>>> PCollection<Bar> bars;  // user-specific type from which all the read
>>>>>> parameters can be inferred
>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>>>>>   ...etc);
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> (my excuses for the long email but this requires context)
>>>>>>>
>>>>>>> As part of the move from Source based IOs to DoFn based ones. One
>>>>>>> pattern
>>>>>>> emerged due to the composable nature of DoFn. The idea is to have a
>>>>>>> different
>>>>>>> kind of composable reads where we take a PCollection of different
>>>>>>> sorts of
>>>>>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>>>>>
>>>>>>> JdbcIO:
>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>>>>
>>>>>>> RedisIO:
>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>> PCollection<KV<String, String>>>
>>>>>>>
>>>>>>> HBaseIO:
>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>> PCollection<Result>>
>>>>>>>
>>>>>>> These patterns enabled richer use cases like doing multiple queries
>>>>>>> in the same
>>>>>>> Pipeline, querying based on key patterns or querying from multiple
>>>>>>> tables at the
>>>>>>> same time but came with some maintenance issues:
>>>>>>>
>>>>>>> - We ended up needing to add to the ReadAll transforms the
>>>>>>> parameters for
>>>>>>>   missing information so we ended up with lots of duplicated with
>>>>>>> methods and
>>>>>>>   error-prone code from the Read transforms into the ReadAll
>>>>>>> transforms.
>>>>>>>
>>>>>>> - When you require new parameters you have to expand the input
>>>>>>> parameters of the
>>>>>>>   intermediary specification into something that resembles the full
>>>>>>> `Read`
>>>>>>>   definition for example imagine you want to read from multiple
>>>>>>> tables or
>>>>>>>   servers as part of the same pipeline but this was not in the
>>>>>>> intermediate
>>>>>>>   specification you end up adding those extra methods (duplicating
>>>>>>> more code)
>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>
>>>>>>> - If new parameters are added to the Read method we end up adding
>>>>>>> them
>>>>>>>   systematically to the ReadAll transform too so they are taken into
>>>>>>> account.
>>>>>>>
>>>>>>> Due to these issues I recently did a change to test a new approach
>>>>>>> that is
>>>>>>> simpler, more complete and maintainable. The code became:
>>>>>>>
>>>>>>> HBaseIO:
>>>>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>>>>>>>
>>>>>>> With this approach users gain benefits of improvements on parameters
>>>>>>> of normal
>>>>>>> Read because they count with the full Read parameters. But of course
>>>>>>> there are
>>>>>>> some minor caveats:
>>>>>>>
>>>>>>> 1. You need to push some information into normal Reads for example
>>>>>>>    partition boundaries information or Restriction information (in
>>>>>>> the SDF
>>>>>>>    case).  Notice that this consistent approach of ReadAll produces
>>>>>>> a simple
>>>>>>>    pattern that ends up being almost reusable between IOs (e.g. the
>>>>>>>   non-SDF
>>>>>>>    case):
>>>>>>>
>>>>>>>   public static class ReadAll extends PTransform<PCollection<Read>,
>>>>>>> PCollection<SolrDocument>> {
>>>>>>>     @Override
>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read> input)
>>>>>>> {
>>>>>>>       return input
>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>     }
>>>>>>>   }
>>>>>>>
>>>>>>> 2. If you are using Generic types for the results ReadAll you must
>>>>>>> have the
>>>>>>>    Coders used in its definition and require consistent types from
>>>>>>> the data
>>>>>>>    sources, in practice this means we need to add extra withCoder
>>>>>>> method(s) on
>>>>>>>    ReadAll but not the full specs.
>>>>>>>
>>>>>>>
>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
>>>>>>> pattern. RedisIO
>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring
>>>>>>> this subject
>>>>>>> to the mailing list to see your opinions, and if you see any sort of
>>>>>>> issues that
>>>>>>> we might be missing with this idea.
>>>>>>>
>>>>>>> Also I would like to see if we have consensus to start using
>>>>>>> consistently the
>>>>>>> terminology of ReadAll transforms based on Read and the readAll()
>>>>>>> method for new
>>>>>>> IOs (at this point probably outdoing this in the only remaining
>>>>>>> inconsistent
>>>>>>> place in JdbcIO might not be a good idea but apart of this we should
>>>>>>> be ok).
>>>>>>>
>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is
>>>>>>> doing something
>>>>>>> similar to the old pattern but being called ReadAll and maybe it is
>>>>>>> worth to be
>>>>>>> consistent for the benefit of users.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ismaël
>>>>>>>
>>>>>>

Reply via email to