yelianevich commented on issue #31085:
URL: https://github.com/apache/beam/issues/31085#issuecomment-2126656952
> I don't know if there are any workarounds, as the described behavior seems
to be (unknown) bug. It needs further investigation. Could you please provide a
simplified pipeline that is affected by this?
I could easily reproduce it locally with a test container of Confluent Kafka
7.6.0 and Flink 1.15.4.
I run this test with Beam 2.51.0 (the last version that work) and 2.56.0
(always fails).
Here is the test case, see the comments in the code.
```java
@Test
void testBeamFromKafkaSourcesIssue() throws Exception {
// this topic receives data
String topicFull = "topic-in-1";
// this topic is empty - the main ingredient to reproduce the issue
// if I remove it from the input - it works on 2.56.0
String topicEmpty = "topic-in-2";
try (AdminClient adminClient =
KafkaAdminClient.create(kafkaProperties.buildAdminProperties())) {
adminClient
.createTopics(List.of(
new NewTopic(topicFull, 3, (short) 1),
new NewTopic(topicEmpty, 3, (short) 1)
))
.all()
.get(5, TimeUnit.SECONDS);
}
try (KafkaProducer<String, String> producer = new
KafkaProducer<>(kafkaProperties.buildProducerProperties())) {
producer.send(new ProducerRecord<>(topicFull, 0, null,
"payload-0")).get();
producer.send(new ProducerRecord<>(topicFull, 1, null,
"payload-11")).get();
producer.send(new ProducerRecord<>(topicFull, 2, null,
"payload-222")).get();
producer.send(new ProducerRecord<>(topicFull, 0, null,
"payload-0")).get();
}
PipelineOptions opts = PipelineOptionsFactory.create();
opts.setRunner(TestFlinkRunner.class);
Pipeline pipeline = Pipeline.create(opts);
String bootstrapServers = String.join(",",
kafkaProperties.getBootstrapServers());
PCollection<KafkaRecord<String, String>> readFullTopic = pipeline
.apply("ReadTopic1", createReader(topicFull,
bootstrapServers));
PCollection<KafkaRecord<String, String>> readEmptyTopic = pipeline
.apply("ReadTopic2", createReader(topicEmpty,
bootstrapServers));
PCollectionList.of(List.of(readFullTopic, readEmptyTopic))
.apply("Flatten", Flatten.pCollections())
.apply("ToString", MapElements.into(strings()).via(r ->
r.getKV().getValue()))
.apply("LogInput", ParDo.of(LogContext.of("Input")))
.apply("Window",
Window.into(FixedWindows.of(Duration.standardSeconds(3))))
.apply("Count", Count.perElement())
.apply("LogOutput", ParDo.of(LogContext.of("Counts")));
pipeline.run();
}
private static KafkaIO.Read<String, String> createReader(String
kafkaTopic, String bootstrapServers) {
return KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapServers)
.withTopic(kafkaTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(Map.of(
AUTO_OFFSET_RESET_CONFIG, "earliest"
));
}
@AllArgsConstructor(staticName = "of")
static class LogContext<T> extends DoFn<T, T> {
private final String prefix;
@ProcessElement
public void processElement(ProcessContext c) {
System.out.printf("%s: Element: %s, pane: %s, ts: %s%n", prefix,
c.element(), c.pane(), c.timestamp());
c.output(c.element());
}
}
```
Output from `LogContext`
2.56.0 (never fires a window, never outputs counts)
```
Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:31:08.674Z
Input: Element: payload-11, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:31:08.695Z
Input: Element: payload-222, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:31:08.696Z
Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:31:08.696Z
```
2.51.0 (expected)
```
Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:26:43.119Z
Input: Element: payload-11, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:26:43.140Z
Input: Element: payload-222, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:26:43.141Z
Input: Element: payload-0, pane: PaneInfo.NO_FIRING, ts:
2024-05-23T09:26:43.141Z
Counts: Element: KV{payload-0, 2}, pane: PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}, ts: 2024-05-23T09:26:44.999Z
Counts: Element: KV{payload-222, 1}, pane: PaneInfo{isFirst=true,
isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}, ts:
2024-05-23T09:26:44.999Z
Counts: Element: KV{payload-11, 1}, pane: PaneInfo{isFirst=true,
isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}, ts:
2024-05-23T09:26:44.999Z
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]