Great news, thank you for working on this! 

1. One of the demanded feature is to discover (and remove probably) new 
partitions and topics in runtime [1].


Do you expect any non-compatible API changes by adding SDF Read version or it’s 
going to be just another user API that can be used in parallel with old one?


[1] https://issues.apache.org/jira/browse/BEAM-5786


> On 29 May 2020, at 04:16, Boyuan Zhang <[email protected]> wrote:
> 
> Hi team,
> 
> I'm Boyuan, currently working on building a Kafka read PTransform on top of 
> SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want to 
> discuss with you:
> 
> 1.  Compared to the KafkaIO.Read 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>  the SplittableDoFn Kafka version allows taking TopicPartition and 
> startReadTime as elements and processing them during execution time, instead 
> of configuring topics at pipeline construction time. I'm wondering whether 
> there are other configurations we also want to populate during pipeline 
> execution time instead of construction time. Taking these configurations as 
> elements would make value when they could be different for different 
> TopicPartition. For a list of configurations we have now, please refer to 
> KafkaIO.Read 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>.
>  
> 
> 2. I also want to offer a simple way for KafkaIO.Read to expand with the SDF 
> version PTransform. Almost all configurations can be translated easily from 
> KafkaIO.Read to the SDF version read except custom TimestampPolicyFactory 
> (It's easy to translate build-in default types such as withProcessingTime 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>  withCreateTime 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>  and withLogAppendTime 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>  With SplittableDoFn, we have WatermarkEstimator 
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>  to track watermark per TopicPartition. Thus, instead of 
> TimestampPolicyFactory 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java>
>  , we need the user to provide a function which can extract output timestamp 
> from a KafkaRecord(like withTimestampFn 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>  My question here is, are the default types enough for current Kafka.Read 
> users? If the custom TimestampPolicy is really in common? Is it okay to use 
> current API withTimestampFn 
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>
>  in KafkaIO.Read to accept the custom function and populate it to the SDF 
> read transform?
> 
> Thanks for your help!
> 
> [1] https://beam.apache.org/blog/splittable-do-fn/ 
> <https://beam.apache.org/blog/splittable-do-fn/>
> [2] https://s.apache.org/splittable-do-fn 
> <https://s.apache.org/splittable-do-fn>
> [3] My prototype PR https://github.com/apache/beam/pull/11749 
> <https://github.com/apache/beam/pull/11749>

Reply via email to