Sorry for the typo. I mean I think we can go with *(3)* and (4): use the
data type that is schema-aware as the input of ReadAll.

On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang <[email protected]> wrote:

> Thanks for the summary, Cham!
>
> I think we can go with (2) and (4): use the data type that is schema-aware
> as the input of ReadAll.
>
> Converting Read into ReadAll helps us to stick with SDF-like IO. But only
> having  (3) is not enough to solve the problem of using ReadAll in x-lang
> case.
>
> The key point of ReadAll is that the input type of ReadAll should be able
> to cross language boundaries and have compatibilities of
> updating/downgrading. After investigating some possibilities(pure java pojo
> with custom coder, protobuf, row/schema) in Kafka usage, we find that
> row/schema fits our needs most. Here comes (4). I believe that using Read
> as input of ReadAll makes sense in some cases, but I also think not all IOs
> have the same need. I would treat Read as a special type as long as the
> Read is schema-aware.
>
> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>> I see. So it seems like there are three options discussed so far when it
>> comes to defining source descriptors for ReadAll type transforms
>>
>> (1) Use Read PTransform as the element type of the input PCollection
>> (2) Use a POJO that describes the source as the data element of the input
>> PCollection
>> (3) Provide a converter as a function to the Read transform which
>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>
>> I feel like (3) is more suitable for a related set of source descriptions
>> such as files.
>> (1) will allow most code-reuse but seems like will make it hard to use
>> the ReadAll transform as a cross-language transform and will break the
>> separation of construction time and runtime constructs
>> (2) could result to less code reuse if not careful but will make the
>> transform easier to be used as a cross-language transform without
>> additional modifications
>>
>> Also, with SDF, we can create ReadAll-like transforms that are more
>> efficient. So we might be able to just define all sources in that format
>> and make Read transforms just an easy to use composite built on top of that
>> (by adding a preceding Create transform).
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik <[email protected]> wrote:
>>
>>> I believe we do require PTransforms to be serializable since anonymous
>>> DoFns typically capture the enclosing PTransform.
>>>
>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>> [email protected]> wrote:
>>>
>>>> Seems like Read in PCollection<Read> refers to a transform, at least
>>>> here:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>
>>>> I'm in favour of separating construction time transforms from execution
>>>> time data objects that we store in PCollections as Luke mentioned. Also, we
>>>> don't guarantee that PTransform is serializable so users have the
>>>> additional complexity of providing a corder whenever a PTransform is used
>>>> as a data object.
>>>> Also, agree with Boyuan that using simple Java objects that are
>>>> convertible to Beam Rows allow us to make these transforms available to
>>>> other SDKs through the cross-language transforms. Using transforms or
>>>> complex sources as data objects will probably make this difficult.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>
>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Ismael,
>>>>>
>>>>> I think the ReadAll in the IO connector refers to the IO with SDF
>>>>> implementation despite the type of input, where Read refers to
>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>>> making source description across language boundaries is also necessary.  
>>>>> As
>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>> object:
>>>>> KafkaSourceDescription.java
>>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>.
>>>>> Then the coder of this schema-aware object will be a SchemaCoder
>>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>.
>>>>> When crossing language boundaries, it's also easy to convert a Row into 
>>>>> the
>>>>> source description: Convert.fromRows
>>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480>
>>>>> .
>>>>>
>>>>>
>>>>> On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> To provide additional context, the KafkaIO ReadAll transform takes a
>>>>>> PCollection<KafkaSourceDescriptor>. This KafkaSourceDescriptor is a POJO
>>>>>> that contains the configurable parameters for reading from Kafka. This is
>>>>>> different from the pattern that Ismael listed because they take
>>>>>> PCollection<Read> as input and the Read is the same as the Read 
>>>>>> PTransform
>>>>>> class used for the non read all case.
>>>>>>
>>>>>> The KafkaSourceDescriptor does lead to duplication since parameters
>>>>>> used to configure the transform have to be copied over to the source
>>>>>> descriptor but decouples how a transform is specified from the object 
>>>>>> that
>>>>>> describes what needs to be done. I believe Ismael's point is that we
>>>>>> wouldn't need such a decoupling.
>>>>>>
>>>>>> Another area that hasn't been discussed and I believe is a non-issue
>>>>>> is that the Beam Java SDK has the most IO connectors and we would want to
>>>>>> use the IO implementations within Beam Go and Beam Python. This brings in
>>>>>> its own set of issues related to versioning and compatibility for the 
>>>>>> wire
>>>>>> format and how one parameterizes such transforms. The wire format issue 
>>>>>> can
>>>>>> be solved with either approach by making sure that the cross language
>>>>>> expansion always takes the well known format (whatever it may be) and
>>>>>> converts it into Read/KafkaSourceDescriptor/... object that is then 
>>>>>> passed
>>>>>> to the ReadAll transform. Boyuan has been looking to make the
>>>>>> KafkaSourceDescriptor have a schema so it can be represented as a row and
>>>>>> this can be done easily using the AutoValue integration (I don't believe
>>>>>> there is anything preventing someone from writing a schema row -> Read ->
>>>>>> row adapter or also using the AutoValue configuration if the transform is
>>>>>> also an AutoValue).
>>>>>>
>>>>>> I would be more for the code duplication and separation of concerns
>>>>>> provided by using a different object to represent the contents of the
>>>>>> PCollection from the pipeline construction time PTransform.
>>>>>>
>>>>>> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Ismael,
>>>>>>>
>>>>>>> Thanks for taking this on. Have you considered an approach similar
>>>>>>> (or dual) to FileIO.write(), where we in a sense also have to configure 
>>>>>>> a
>>>>>>> dynamic number different IO transforms of the same type (file writes)?
>>>>>>>
>>>>>>> E.g. how in this example we configure many aspects of many file
>>>>>>> writes:
>>>>>>>
>>>>>>> transactions.apply(FileIO.<TransactionType,
>>>>>>> Transaction>writeDynamic()
>>>>>>>      .by(Transaction::getType)
>>>>>>>      .via(tx -> tx.getType().toFields(tx),  // Convert the data to
>>>>>>> be written to CSVSink
>>>>>>>           type -> new CSVSink(type.getFieldNames()))
>>>>>>>      .to(".../path/to/")
>>>>>>>      .withNaming(type -> defaultNaming(type + "-transactions",
>>>>>>> ".csv"));
>>>>>>>
>>>>>>> we could do something similar for many JdbcIO reads:
>>>>>>>
>>>>>>> PCollection<Bar> bars;  // user-specific type from which all the
>>>>>>> read parameters can be inferred
>>>>>>> PCollection<Moo> moos = bars.apply(JdbcIO.<Bar, Moo>readAll()
>>>>>>>   .fromQuery(bar -> ...compute query for this bar...)
>>>>>>>   .withMapper((bar, resultSet) -> new Moo(...))
>>>>>>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>>>>>>   ...etc);
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> (my excuses for the long email but this requires context)
>>>>>>>>
>>>>>>>> As part of the move from Source based IOs to DoFn based ones. One
>>>>>>>> pattern
>>>>>>>> emerged due to the composable nature of DoFn. The idea is to have a
>>>>>>>> different
>>>>>>>> kind of composable reads where we take a PCollection of different
>>>>>>>> sorts of
>>>>>>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>>>>>>
>>>>>>>> JdbcIO:
>>>>>>>> ReadAll<ParameterT, OutputT> extends
>>>>>>>> PTransform<PCollection<ParameterT>, PCollection<OutputT>>
>>>>>>>>
>>>>>>>> RedisIO:
>>>>>>>> ReadAll extends PTransform<PCollection<String>,
>>>>>>>> PCollection<KV<String, String>>>
>>>>>>>>
>>>>>>>> HBaseIO:
>>>>>>>> ReadAll extends PTransform<PCollection<HBaseQuery>,
>>>>>>>> PCollection<Result>>
>>>>>>>>
>>>>>>>> These patterns enabled richer use cases like doing multiple queries
>>>>>>>> in the same
>>>>>>>> Pipeline, querying based on key patterns or querying from multiple
>>>>>>>> tables at the
>>>>>>>> same time but came with some maintenance issues:
>>>>>>>>
>>>>>>>> - We ended up needing to add to the ReadAll transforms the
>>>>>>>> parameters for
>>>>>>>>   missing information so we ended up with lots of duplicated with
>>>>>>>> methods and
>>>>>>>>   error-prone code from the Read transforms into the ReadAll
>>>>>>>> transforms.
>>>>>>>>
>>>>>>>> - When you require new parameters you have to expand the input
>>>>>>>> parameters of the
>>>>>>>>   intermediary specification into something that resembles the full
>>>>>>>> `Read`
>>>>>>>>   definition for example imagine you want to read from multiple
>>>>>>>> tables or
>>>>>>>>   servers as part of the same pipeline but this was not in the
>>>>>>>> intermediate
>>>>>>>>   specification you end up adding those extra methods (duplicating
>>>>>>>> more code)
>>>>>>>>   just o get close to the be like the Read full spec.
>>>>>>>>
>>>>>>>> - If new parameters are added to the Read method we end up adding
>>>>>>>> them
>>>>>>>>   systematically to the ReadAll transform too so they are taken
>>>>>>>> into account.
>>>>>>>>
>>>>>>>> Due to these issues I recently did a change to test a new approach
>>>>>>>> that is
>>>>>>>> simpler, more complete and maintainable. The code became:
>>>>>>>>
>>>>>>>> HBaseIO:
>>>>>>>> ReadAll extends PTransform<PCollection<Read>, PCollection<Result>>
>>>>>>>>
>>>>>>>> With this approach users gain benefits of improvements on
>>>>>>>> parameters of normal
>>>>>>>> Read because they count with the full Read parameters. But of
>>>>>>>> course there are
>>>>>>>> some minor caveats:
>>>>>>>>
>>>>>>>> 1. You need to push some information into normal Reads for example
>>>>>>>>    partition boundaries information or Restriction information (in
>>>>>>>> the SDF
>>>>>>>>    case).  Notice that this consistent approach of ReadAll produces
>>>>>>>> a simple
>>>>>>>>    pattern that ends up being almost reusable between IOs (e.g.
>>>>>>>> the    non-SDF
>>>>>>>>    case):
>>>>>>>>
>>>>>>>>   public static class ReadAll extends PTransform<PCollection<Read>,
>>>>>>>> PCollection<SolrDocument>> {
>>>>>>>>     @Override
>>>>>>>>     public PCollection<SolrDocument> expand(PCollection<Read>
>>>>>>>> input) {
>>>>>>>>       return input
>>>>>>>>           .apply("Split", ParDo.of(new SplitFn()))
>>>>>>>>           .apply("Reshuffle", Reshuffle.viaRandomKey())
>>>>>>>>           .apply("Read", ParDo.of(new ReadFn()));
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>> 2. If you are using Generic types for the results ReadAll you must
>>>>>>>> have the
>>>>>>>>    Coders used in its definition and require consistent types from
>>>>>>>> the data
>>>>>>>>    sources, in practice this means we need to add extra withCoder
>>>>>>>> method(s) on
>>>>>>>>    ReadAll but not the full specs.
>>>>>>>>
>>>>>>>>
>>>>>>>> At the moment HBaseIO and SolrIO already follow this ReadAll
>>>>>>>> pattern. RedisIO
>>>>>>>> and CassandraIO have already WIP PRs to do so. So I wanted to bring
>>>>>>>> this subject
>>>>>>>> to the mailing list to see your opinions, and if you see any sort
>>>>>>>> of issues that
>>>>>>>> we might be missing with this idea.
>>>>>>>>
>>>>>>>> Also I would like to see if we have consensus to start using
>>>>>>>> consistently the
>>>>>>>> terminology of ReadAll transforms based on Read and the readAll()
>>>>>>>> method for new
>>>>>>>> IOs (at this point probably outdoing this in the only remaining
>>>>>>>> inconsistent
>>>>>>>> place in JdbcIO might not be a good idea but apart of this we
>>>>>>>> should be ok).
>>>>>>>>
>>>>>>>> I mention this because the recent PR on KafkaIO based on SDF is
>>>>>>>> doing something
>>>>>>>> similar to the old pattern but being called ReadAll and maybe it is
>>>>>>>> worth to be
>>>>>>>> consistent for the benefit of users.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Ismaël
>>>>>>>>
>>>>>>>

Reply via email to