*Observations:*
If we read using KafkaIO for a list of topics where one of the topics has
zero throughputs,
and KafkaIO is followed by GroupByKey stage, then:
a. No data is output from GroupByKey stage for all the topics and not just
the zero throughput topic.
If all topics have some throughput coming in, then it works fine and we get
some output from GroupByKey stage.
Is this an issue?
*Points:*
a. The output from GroupByKey is only when all topics have some throughput
b. This is a problem with KafkaIO + GroupByKey, for case where I have
FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
even if there is no data for one of the files.
c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
d. Even if lag is different for each topic on the list, we still get some
output from GroupByKey.
*Debugging:*While Debugging this issue I found that in split function of
KafkaUnboundedSource we create KafkaUnboundedSource where partition list is
one partition for each topic.
I am not sure if this is some issue with watermark, since watermark for the
topic with no throughput will not advance. But this looks like the most
likely cause to me.
*Please help me in figuring out whether this is an issue or if there is
something wrong with my pipeline.*
Attaching detailed pipeline information for more details:
*Context:*
I am currently using KafkaIO to read data from kafka for a list of topics
with a custom timestamp policy.
Below is how I am constructing KafkaIO reader:
return KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(brokers)
.withTopics(topics)
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withTimestampPolicyFactory((partition, previousWatermark) ->
new EventTimestampPolicy(godataService, previousWatermark))
.commitOffsetsInFinalize();
*Pipeline Information:
*Pipeline Consists of six steps:
a. Read From Kafka with custom timestamp policy
b. Convert KafkaRecord to Message object
c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
d. PCollection<Message> to PCollection<KV<String, Message>> where
Topic is Keye. GroupByKey.create() to get PCollection<KV<String,
Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to
PCollection<ComputedMetrics> for each topicg. Write output to kafka
*Detailed Pipeline Information*
a. Read data from kafka to get KafkaRecord<byte[], byte[]>
Here I am using my own timestamp policy which looks like below:
public EventTimestampPolicy(MyService myService, Optional<Instant>
previousWatermark) {
this.myService = myService;
this.currentWatermark =
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(PartitionContext context,
KafkaRecord<byte[], byte[]> record) {
Instant eventTimestamp;
try {
eventTimestamp = Deserializer.getEventTimestamp(record, myService);
} catch (InvalidProtocolBufferException e) {
statsClient.increment("io.proto.buffer.exception");
throw new RuntimeException(e);
}
this.currentWatermark = eventTimestamp;
return this.currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
return this.currentWatermark;
}
Event timestamp is one of the fields in the kafka message. It is the time
when the event was pushed to kafka.
b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The
Message class contains properties like offset, topic, partition,
offset and timestamp
c. Windowing on 10 minute fixed window triggering at
AfterWatermark.pastEndOfWindow()
d. PCollection<Message> to PCollection<KV<String, Message>>
Here Key is the kafka topic.
e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
f. PCollection<KV<String, Iterable<Message>> to
PCollection<ComputedMetrics> for each topic
g. Write output to kafka