Hello Rahul,

Thanks again for the detailed explanation.

I require some guidance on what values to be set for maxDelay and
previousWatermark for CustomTimestampPolicyWithLimitedDelay.

Currently, I was providing maxDelay as Duration.ZERO and previousWatermark
as Optional.empty().
With these values I see that the getWatermark function always goes to else
block(code link) and always returns TIMESTAMP_MIN_VALUE.
So with this case as well, I see that the watermark is returned as
TIMESTAMP_MIN_VALUE for zero throughput topics.

Please share your observations on how to tune the Timestamp Policy.

Thanks and regards,
Maulik


On Fri, Feb 28, 2020 at 8:46 PM rahul patwari <rahulpatwari8...@gmail.com>
wrote:

> Hi Maulik,
>
> Currently, I don't think it is possible to filter topics based on whether
> data is being produced to the topic (or) not.
> But, the Watermark logic can be changed to make the Pipeline work.
>
> Since the timestamps of the records are the time when the events are
> pushed to Kafka, every record will have monotonically increasing timestamps
> except for out of order events.
> Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE
> by default, we can assign [current_timestamp - some_delay] as default and
> the same can be done in getWatermark() method, in which case, even if the
> partition is idle, Watermark will advance.
>
> Make sure that the timestamp of the Watermark is monotonically increasing
> and choose the delay carefully in order to avoid discarding out of order
> events.
>
> Refer
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
> for an example.
>
> Regards,
> Rahul
>
>
> On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <maulik.son...@gojek.com>
> wrote:
>
>> Hi Rahul,
>>
>> Thank you very much for the detailed explanation.
>>
>> Since we don't know which are the topics that have zero throughputs, is
>> there a way in which we can filter out such topics in KafkaIO?
>>
>> Since KafkaIO doesn't support passing a regex to consume data from, I am
>> getting a list of topics from kafka and passing it.
>>
>> Is there a way to filter out such topics? Also, it can happen that when
>> the job has started the topic might have no data for a few windows and
>> after that, it can get some data. This filter should be dynamic as well.
>>
>> Please share some ideas on how we can make this work.
>>
>> Community members, please share your thoughts as well on how we can
>> achieve this.
>>
>> Thanks and regards,
>> Maulik
>>
>> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <rahulpatwari8...@gmail.com>
>> wrote:
>>
>>> Hi Maulik,
>>>
>>> This seems like an issue with Watermark.
>>> According to
>>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
>>> ,
>>>
>>> If there are multiple partitions (or) multiple topics, Watermark will be
>>> calculated for each of the partition and the minimum watermark is
>>> considered as the current Watermark.
>>> Assuming that no message is pushed to the topic with 0 throughput,
>>> according to your logic for the watermark calculation, the watermark of
>>> each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE
>>> (the smallest representable timestamp of an element -
>>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
>>> ).
>>>
>>> As the result will be emitted from GroupByKey when the Watermark crosses
>>> the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE,
>>> you are not seeing the results from GroupByKey.
>>>
>>> Regards,
>>> Rahul
>>>
>>> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <maulik.son...@gojek.com>
>>> wrote:
>>>
>>>> *Observations:*
>>>> If we read using KafkaIO for a list of topics where one of the topics
>>>> has zero throughputs,
>>>> and KafkaIO is followed by GroupByKey stage, then:
>>>> a. No data is output from GroupByKey stage for all the topics and not
>>>> just the zero throughput topic.
>>>>
>>>> If all topics have some throughput coming in, then it works fine and we
>>>> get some output from GroupByKey stage.
>>>>
>>>> Is this an issue?
>>>>
>>>> *Points:*
>>>> a. The output from GroupByKey is only when all topics have some
>>>> throughput
>>>> b. This is a problem with KafkaIO + GroupByKey, for case where I have
>>>> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
>>>> even if there is no data for one of the files.
>>>> c. Not a runner issue, since I ran it with FlinkRunner and
>>>> DataflowRunner
>>>> d. Even if lag is different for each topic on the list, we still get
>>>> some output from GroupByKey.
>>>>
>>>>
>>>> *Debugging:*While Debugging this issue I found that in split function
>>>> of KafkaUnboundedSource we create KafkaUnboundedSource where partition list
>>>> is one partition for each topic.
>>>>
>>>> I am not sure if this is some issue with watermark, since watermark for
>>>> the topic with no throughput will not advance. But this looks like the most
>>>> likely cause to me.
>>>>
>>>> *Please help me in figuring out whether this is an issue or if there is
>>>> something wrong with my pipeline.*
>>>>
>>>> Attaching detailed pipeline information for more details:
>>>>
>>>> *Context:*
>>>> I am currently using KafkaIO to read data from kafka for a list of
>>>> topics with a custom timestamp policy.
>>>>
>>>> Below is how I am constructing KafkaIO reader:
>>>>
>>>> return KafkaIO.<byte[], byte[]>read()
>>>>         .withBootstrapServers(brokers)
>>>>         .withTopics(topics)
>>>>         .withKeyDeserializer(ByteArrayDeserializer.class)
>>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>>         .withTimestampPolicyFactory((partition, previousWatermark) -> new 
>>>> EventTimestampPolicy(godataService, previousWatermark))
>>>>         .commitOffsetsInFinalize();
>>>>
>>>> *Pipeline Information:
>>>> *Pipeline Consists of six steps:
>>>> a. Read From Kafka with custom timestamp policy
>>>> b. Convert KafkaRecord to Message object
>>>> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
>>>> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is 
>>>> Keye. GroupByKey.create() to get PCollection<KV<String, 
>>>> Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to 
>>>> PCollection<ComputedMetrics> for each topicg. Write output to kafka
>>>>
>>>> *Detailed Pipeline Information*
>>>> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
>>>> Here I am using my own timestamp policy which looks like below:
>>>>
>>>> public EventTimestampPolicy(MyService myService, Optional<Instant> 
>>>> previousWatermark) {
>>>>     this.myService = myService;
>>>>     this.currentWatermark = 
>>>> previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
>>>> }
>>>>
>>>> @Override
>>>> public Instant getTimestampForRecord(PartitionContext context, 
>>>> KafkaRecord<byte[], byte[]> record) {
>>>>     Instant eventTimestamp;
>>>>     try {
>>>>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>>>>     } catch (InvalidProtocolBufferException e) {
>>>>         statsClient.increment("io.proto.buffer.exception");
>>>>         throw new RuntimeException(e);
>>>>     }
>>>>     this.currentWatermark = eventTimestamp;
>>>>     return this.currentWatermark;
>>>> }
>>>>
>>>> @Override
>>>> public Instant getWatermark(PartitionContext ctx) {
>>>>     return this.currentWatermark;
>>>> }
>>>>
>>>> Event timestamp is one of the fields in the kafka message. It is the
>>>> time when the event was pushed to kafka.
>>>>
>>>> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The 
>>>> Message class contains properties like offset, topic, partition, offset 
>>>> and timestamp
>>>>
>>>> c. Windowing on 10 minute fixed window triggering at 
>>>> AfterWatermark.pastEndOfWindow()
>>>>
>>>> d. PCollection<Message> to PCollection<KV<String, Message>>
>>>> Here Key is the kafka topic.
>>>>
>>>> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
>>>>
>>>> f. PCollection<KV<String, Iterable<Message>> to 
>>>> PCollection<ComputedMetrics> for each topic
>>>>
>>>> g. Write output to kafka
>>>>
>>>>

Reply via email to