Re Alexey: 1. One of the demanded feature is to discover (and remove probably) new > partitions and topics in runtime [1].
Yes, that's the major motivation for us to build Kafka read on top of SplittableDoFn. Do you expect any non-compatible API changes by adding SDF Read version or > it’s going to be just another user API that can be used in parallel with > old one? I want to create a ReadFromKafkaViaSDF<input = TopicPartition, output = KafkaRecord> Ptransform. This transform requires Kafka Consumer configurations and a user-defined function which extracts output timestamp from a KakfaRecord at construction time(or as I mentioned in the first thread, we can treat configuration as input if that could be different for TopicPartitions). From user perspective, there are 2 ways to use ReadFromKafkaViaSDF: 1. I'll add a new API useSDFTransform() to Kafka.Read. When this flag is set, ReadFromKafkaViaSDF will be added into the pipeline. Here I get the problem of translating custom TimestampPolicy to the user-defined function. So I want to ask whether it's feasible to add another new API extractTimestampFn(or reuse TimestampFn()) to KafkaIO.Read. 2. Or the user can choose to rewrite their pipeline with ReadFromKafkaViaSDF directly. Here we require the user to create a new function to extract output timestamp. Re Reuven: Are you able to support CustomTimestampPolicyWithLimitedDelay? For the new transform ReadFromKafkaViaSDF, yes as long as the user provides the timestamp function. If the user wants to use the SDF transform via KafkaIO.Read, we need to expose new APIs to KafkaIO.Read to accept the timestamp function and the type of policy. On Fri, May 29, 2020 at 10:58 AM Reuven Lax <[email protected]> wrote: > Are you able to support CustomTimestampPolicyWithLimitedDelay? > > On Fri, May 29, 2020 at 9:58 AM Boyuan Zhang <[email protected]> wrote: > >> Yes, the WatermarkEstimator tracks output timestamp and gives an estimate >> of watermark per partition. >> The problem is, a user can configure KafkaIO.Read() with custom >> TimestampPolicy, which cannot be translated into a simple function which >> extracts timestamp from a KafkaRecord. In this case, I need to either ask >> the customer to write such a kind of function or just support built-in >> types. >> >> On Thu, May 28, 2020 at 9:12 PM Reuven Lax <[email protected]> wrote: >> >>> This is per-partition, right? In that case I assume it will match the >>> current Kafka watermark. >>> >>> On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <[email protected]> wrote: >>> >>>> Hi Reuven, >>>> >>>> I'm going to use MonotonicallyIncreasing >>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> >>>> by >>>> default and in the future, we may want to support custom kind if there is a >>>> request. >>>> >>>> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Which WatermarkEstimator do you think should be used? >>>>> >>>>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi team, >>>>>> >>>>>> I'm Boyuan, currently working on building a Kafka read PTransform on >>>>>> top of SplittableDoFn[1][2][3]. There are two questions about Kafka >>>>>> usage I >>>>>> want to discuss with you: >>>>>> >>>>>> 1. Compared to the KafkaIO.Read >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>, >>>>>> the SplittableDoFn Kafka version allows taking TopicPartition and >>>>>> startReadTime as elements and processing them during execution time, >>>>>> instead of configuring topics at pipeline construction time. I'm >>>>>> wondering >>>>>> whether there are other configurations we also want to populate during >>>>>> pipeline execution time instead of construction time. Taking these >>>>>> configurations as elements would make value when they could be different >>>>>> for different TopicPartition. For a list of configurations we have now, >>>>>> please refer to KafkaIO.Read >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351> >>>>>> . >>>>>> >>>>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with >>>>>> the SDF version PTransform. Almost all configurations can be translated >>>>>> easily from KafkaIO.Read to the SDF version read except custom >>>>>> TimestampPolicyFactory (It's easy to translate build-in default types >>>>>> such >>>>>> as withProcessingTime >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>, >>>>>> withCreateTime >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726> >>>>>> and withLogAppendTime >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.). >>>>>> With SplittableDoFn, we have WatermarkEstimator >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java> >>>>>> to track watermark per TopicPartition. Thus, instead of >>>>>> TimestampPolicyFactory >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> >>>>>> , >>>>>> we need the user to provide a function which can extract output timestamp >>>>>> from a KafkaRecord(like withTimestampFn >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>). >>>>>> My question here is, are the default types enough for current Kafka.Read >>>>>> users? If the custom TimestampPolicy is really in common? Is it okay to >>>>>> use >>>>>> current API withTimestampFn >>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> >>>>>> in >>>>>> KafkaIO.Read to accept the custom function and populate it to the SDF >>>>>> read >>>>>> transform? >>>>>> >>>>>> Thanks for your help! >>>>>> >>>>>> [1] https://beam.apache.org/blog/splittable-do-fn/ >>>>>> [2] https://s.apache.org/splittable-do-fn >>>>>> [3] My prototype PR https://github.com/apache/beam/pull/11749 >>>>>> >>>>>
