Are you able to support CustomTimestampPolicyWithLimitedDelay? On Fri, May 29, 2020 at 9:58 AM Boyuan Zhang <[email protected]> wrote:
> Yes, the WatermarkEstimator tracks output timestamp and gives an estimate > of watermark per partition. > The problem is, a user can configure KafkaIO.Read() with custom > TimestampPolicy, which cannot be translated into a simple function which > extracts timestamp from a KafkaRecord. In this case, I need to either ask > the customer to write such a kind of function or just support built-in > types. > > On Thu, May 28, 2020 at 9:12 PM Reuven Lax <[email protected]> wrote: > >> This is per-partition, right? In that case I assume it will match the >> current Kafka watermark. >> >> On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <[email protected]> wrote: >> >>> Hi Reuven, >>> >>> I'm going to use MonotonicallyIncreasing >>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> >>> by >>> default and in the future, we may want to support custom kind if there is a >>> request. >>> >>> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <[email protected]> wrote: >>> >>>> Which WatermarkEstimator do you think should be used? >>>> >>>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> Hi team, >>>>> >>>>> I'm Boyuan, currently working on building a Kafka read PTransform on >>>>> top of SplittableDoFn[1][2][3]. There are two questions about Kafka usage >>>>> I >>>>> want to discuss with you: >>>>> >>>>> 1. Compared to the KafkaIO.Read >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>, >>>>> the SplittableDoFn Kafka version allows taking TopicPartition and >>>>> startReadTime as elements and processing them during execution time, >>>>> instead of configuring topics at pipeline construction time. I'm wondering >>>>> whether there are other configurations we also want to populate during >>>>> pipeline execution time instead of construction time. Taking these >>>>> configurations as elements would make value when they could be different >>>>> for different TopicPartition. For a list of configurations we have now, >>>>> please refer to KafkaIO.Read >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351> >>>>> . >>>>> >>>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with >>>>> the SDF version PTransform. Almost all configurations can be translated >>>>> easily from KafkaIO.Read to the SDF version read except custom >>>>> TimestampPolicyFactory (It's easy to translate build-in default types such >>>>> as withProcessingTime >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>, >>>>> withCreateTime >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726> >>>>> and withLogAppendTime >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.). >>>>> With SplittableDoFn, we have WatermarkEstimator >>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java> >>>>> to track watermark per TopicPartition. Thus, instead of >>>>> TimestampPolicyFactory >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> >>>>> , >>>>> we need the user to provide a function which can extract output timestamp >>>>> from a KafkaRecord(like withTimestampFn >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>). >>>>> My question here is, are the default types enough for current Kafka.Read >>>>> users? If the custom TimestampPolicy is really in common? Is it okay to >>>>> use >>>>> current API withTimestampFn >>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> >>>>> in >>>>> KafkaIO.Read to accept the custom function and populate it to the SDF read >>>>> transform? >>>>> >>>>> Thanks for your help! >>>>> >>>>> [1] https://beam.apache.org/blog/splittable-do-fn/ >>>>> [2] https://s.apache.org/splittable-do-fn >>>>> [3] My prototype PR https://github.com/apache/beam/pull/11749 >>>>> >>>>
