Great news, thank you for working on this! 1. One of the demanded feature is to discover (and remove probably) new partitions and topics in runtime [1].
Do you expect any non-compatible API changes by adding SDF Read version or it’s going to be just another user API that can be used in parallel with old one? [1] https://issues.apache.org/jira/browse/BEAM-5786 > On 29 May 2020, at 04:16, Boyuan Zhang <[email protected]> wrote: > > Hi team, > > I'm Boyuan, currently working on building a Kafka read PTransform on top of > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want to > discuss with you: > > 1. Compared to the KafkaIO.Read > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>, > the SplittableDoFn Kafka version allows taking TopicPartition and > startReadTime as elements and processing them during execution time, instead > of configuring topics at pipeline construction time. I'm wondering whether > there are other configurations we also want to populate during pipeline > execution time instead of construction time. Taking these configurations as > elements would make value when they could be different for different > TopicPartition. For a list of configurations we have now, please refer to > KafkaIO.Read > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>. > > > 2. I also want to offer a simple way for KafkaIO.Read to expand with the SDF > version PTransform. Almost all configurations can be translated easily from > KafkaIO.Read to the SDF version read except custom TimestampPolicyFactory > (It's easy to translate build-in default types such as withProcessingTime > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>, > withCreateTime > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726> > and withLogAppendTime > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.). > With SplittableDoFn, we have WatermarkEstimator > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java> > to track watermark per TopicPartition. Thus, instead of > TimestampPolicyFactory > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> > , we need the user to provide a function which can extract output timestamp > from a KafkaRecord(like withTimestampFn > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>). > My question here is, are the default types enough for current Kafka.Read > users? If the custom TimestampPolicy is really in common? Is it okay to use > current API withTimestampFn > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> > in KafkaIO.Read to accept the custom function and populate it to the SDF > read transform? > > Thanks for your help! > > [1] https://beam.apache.org/blog/splittable-do-fn/ > <https://beam.apache.org/blog/splittable-do-fn/> > [2] https://s.apache.org/splittable-do-fn > <https://s.apache.org/splittable-do-fn> > [3] My prototype PR https://github.com/apache/beam/pull/11749 > <https://github.com/apache/beam/pull/11749>
