Hello Rahul, Thanks again for the detailed explanation.
I require some guidance on what values to be set for maxDelay and previousWatermark for CustomTimestampPolicyWithLimitedDelay. Currently, I was providing maxDelay as Duration.ZERO and previousWatermark as Optional.empty(). With these values I see that the getWatermark function always goes to else block(code link) and always returns TIMESTAMP_MIN_VALUE. So with this case as well, I see that the watermark is returned as TIMESTAMP_MIN_VALUE for zero throughput topics. Please share your observations on how to tune the Timestamp Policy. Thanks and regards, Maulik On Fri, Feb 28, 2020 at 8:46 PM rahul patwari <rahulpatwari8...@gmail.com> wrote: > Hi Maulik, > > Currently, I don't think it is possible to filter topics based on whether > data is being produced to the topic (or) not. > But, the Watermark logic can be changed to make the Pipeline work. > > Since the timestamps of the records are the time when the events are > pushed to Kafka, every record will have monotonically increasing timestamps > except for out of order events. > Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE > by default, we can assign [current_timestamp - some_delay] as default and > the same can be done in getWatermark() method, in which case, even if the > partition is idle, Watermark will advance. > > Make sure that the timestamp of the Watermark is monotonically increasing > and choose the delay carefully in order to avoid discarding out of order > events. > > Refer > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java > for an example. > > Regards, > Rahul > > > On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <maulik.son...@gojek.com> > wrote: > >> Hi Rahul, >> >> Thank you very much for the detailed explanation. >> >> Since we don't know which are the topics that have zero throughputs, is >> there a way in which we can filter out such topics in KafkaIO? >> >> Since KafkaIO doesn't support passing a regex to consume data from, I am >> getting a list of topics from kafka and passing it. >> >> Is there a way to filter out such topics? Also, it can happen that when >> the job has started the topic might have no data for a few windows and >> after that, it can get some data. This filter should be dynamic as well. >> >> Please share some ideas on how we can make this work. >> >> Community members, please share your thoughts as well on how we can >> achieve this. >> >> Thanks and regards, >> Maulik >> >> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <rahulpatwari8...@gmail.com> >> wrote: >> >>> Hi Maulik, >>> >>> This seems like an issue with Watermark. >>> According to >>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240 >>> , >>> >>> If there are multiple partitions (or) multiple topics, Watermark will be >>> calculated for each of the partition and the minimum watermark is >>> considered as the current Watermark. >>> Assuming that no message is pushed to the topic with 0 throughput, >>> according to your logic for the watermark calculation, the watermark of >>> each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE >>> (the smallest representable timestamp of an element - >>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44 >>> ). >>> >>> As the result will be emitted from GroupByKey when the Watermark crosses >>> the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, >>> you are not seeing the results from GroupByKey. >>> >>> Regards, >>> Rahul >>> >>> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <maulik.son...@gojek.com> >>> wrote: >>> >>>> *Observations:* >>>> If we read using KafkaIO for a list of topics where one of the topics >>>> has zero throughputs, >>>> and KafkaIO is followed by GroupByKey stage, then: >>>> a. No data is output from GroupByKey stage for all the topics and not >>>> just the zero throughput topic. >>>> >>>> If all topics have some throughput coming in, then it works fine and we >>>> get some output from GroupByKey stage. >>>> >>>> Is this an issue? >>>> >>>> *Points:* >>>> a. The output from GroupByKey is only when all topics have some >>>> throughput >>>> b. This is a problem with KafkaIO + GroupByKey, for case where I have >>>> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data >>>> even if there is no data for one of the files. >>>> c. Not a runner issue, since I ran it with FlinkRunner and >>>> DataflowRunner >>>> d. Even if lag is different for each topic on the list, we still get >>>> some output from GroupByKey. >>>> >>>> >>>> *Debugging:*While Debugging this issue I found that in split function >>>> of KafkaUnboundedSource we create KafkaUnboundedSource where partition list >>>> is one partition for each topic. >>>> >>>> I am not sure if this is some issue with watermark, since watermark for >>>> the topic with no throughput will not advance. But this looks like the most >>>> likely cause to me. >>>> >>>> *Please help me in figuring out whether this is an issue or if there is >>>> something wrong with my pipeline.* >>>> >>>> Attaching detailed pipeline information for more details: >>>> >>>> *Context:* >>>> I am currently using KafkaIO to read data from kafka for a list of >>>> topics with a custom timestamp policy. >>>> >>>> Below is how I am constructing KafkaIO reader: >>>> >>>> return KafkaIO.<byte[], byte[]>read() >>>> .withBootstrapServers(brokers) >>>> .withTopics(topics) >>>> .withKeyDeserializer(ByteArrayDeserializer.class) >>>> .withValueDeserializer(ByteArrayDeserializer.class) >>>> .withTimestampPolicyFactory((partition, previousWatermark) -> new >>>> EventTimestampPolicy(godataService, previousWatermark)) >>>> .commitOffsetsInFinalize(); >>>> >>>> *Pipeline Information: >>>> *Pipeline Consists of six steps: >>>> a. Read From Kafka with custom timestamp policy >>>> b. Convert KafkaRecord to Message object >>>> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark >>>> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is >>>> Keye. GroupByKey.create() to get PCollection<KV<String, >>>> Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to >>>> PCollection<ComputedMetrics> for each topicg. Write output to kafka >>>> >>>> *Detailed Pipeline Information* >>>> a. Read data from kafka to get KafkaRecord<byte[], byte[]> >>>> Here I am using my own timestamp policy which looks like below: >>>> >>>> public EventTimestampPolicy(MyService myService, Optional<Instant> >>>> previousWatermark) { >>>> this.myService = myService; >>>> this.currentWatermark = >>>> previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); >>>> } >>>> >>>> @Override >>>> public Instant getTimestampForRecord(PartitionContext context, >>>> KafkaRecord<byte[], byte[]> record) { >>>> Instant eventTimestamp; >>>> try { >>>> eventTimestamp = Deserializer.getEventTimestamp(record, myService); >>>> } catch (InvalidProtocolBufferException e) { >>>> statsClient.increment("io.proto.buffer.exception"); >>>> throw new RuntimeException(e); >>>> } >>>> this.currentWatermark = eventTimestamp; >>>> return this.currentWatermark; >>>> } >>>> >>>> @Override >>>> public Instant getWatermark(PartitionContext ctx) { >>>> return this.currentWatermark; >>>> } >>>> >>>> Event timestamp is one of the fields in the kafka message. It is the >>>> time when the event was pushed to kafka. >>>> >>>> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The >>>> Message class contains properties like offset, topic, partition, offset >>>> and timestamp >>>> >>>> c. Windowing on 10 minute fixed window triggering at >>>> AfterWatermark.pastEndOfWindow() >>>> >>>> d. PCollection<Message> to PCollection<KV<String, Message>> >>>> Here Key is the kafka topic. >>>> >>>> e. GroupByKey to get PCollection<KV<String, Iterable<Message>> >>>> >>>> f. PCollection<KV<String, Iterable<Message>> to >>>> PCollection<ComputedMetrics> for each topic >>>> >>>> g. Write output to kafka >>>> >>>>