Are you able to support CustomTimestampPolicyWithLimitedDelay?

On Fri, May 29, 2020 at 9:58 AM Boyuan Zhang <[email protected]> wrote:

> Yes, the WatermarkEstimator tracks output timestamp and gives an estimate
> of watermark per partition.
> The problem is, a user can configure KafkaIO.Read() with custom
> TimestampPolicy, which cannot be translated into a simple function which
> extracts timestamp from a KafkaRecord. In this case, I need to either ask
> the customer to write such a kind of function or just support built-in
> types.
>
> On Thu, May 28, 2020 at 9:12 PM Reuven Lax <[email protected]> wrote:
>
>> This is per-partition, right? In that case I assume it will match the
>> current Kafka watermark.
>>
>> On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <[email protected]> wrote:
>>
>>> Hi Reuven,
>>>
>>> I'm going to use MonotonicallyIncreasing
>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105>
>>>  by
>>> default and in the future, we may want to support custom kind if there is a
>>> request.
>>>
>>> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> Which WatermarkEstimator do you think should be used?
>>>>
>>>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi team,
>>>>>
>>>>> I'm Boyuan, currently working on building a Kafka read PTransform on
>>>>> top of SplittableDoFn[1][2][3]. There are two questions about Kafka usage 
>>>>> I
>>>>> want to discuss with you:
>>>>>
>>>>> 1.  Compared to the KafkaIO.Read
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>>>>> the SplittableDoFn Kafka version allows taking TopicPartition and
>>>>> startReadTime as elements and processing them during execution time,
>>>>> instead of configuring topics at pipeline construction time. I'm wondering
>>>>> whether there are other configurations we also want to populate during
>>>>> pipeline execution time instead of construction time. Taking these
>>>>> configurations as elements would make value when they could be different
>>>>> for different TopicPartition. For a list of configurations we have now,
>>>>> please refer to KafkaIO.Read
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
>>>>> .
>>>>>
>>>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with
>>>>> the SDF version PTransform. Almost all configurations can be translated
>>>>> easily from KafkaIO.Read to the SDF version read except custom
>>>>> TimestampPolicyFactory (It's easy to translate build-in default types such
>>>>> as withProcessingTime
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>>>>> withCreateTime
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>>>>> and withLogAppendTime
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>>>>> With SplittableDoFn, we have WatermarkEstimator
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>>>>> to track watermark per TopicPartition. Thus, instead of
>>>>> TimestampPolicyFactory
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java>
>>>>>  ,
>>>>> we need the user to provide a function which can extract output timestamp
>>>>> from a KafkaRecord(like withTimestampFn
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>>>>> My question here is, are the default types enough for current Kafka.Read
>>>>> users? If the custom TimestampPolicy is really in common? Is it okay to 
>>>>> use
>>>>> current API withTimestampFn
>>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>
>>>>>  in
>>>>> KafkaIO.Read to accept the custom function and populate it to the SDF read
>>>>> transform?
>>>>>
>>>>> Thanks for your help!
>>>>>
>>>>> [1] https://beam.apache.org/blog/splittable-do-fn/
>>>>> [2] https://s.apache.org/splittable-do-fn
>>>>> [3] My prototype PR https://github.com/apache/beam/pull/11749
>>>>>
>>>>

Reply via email to