oemergenc opened a new issue, #29971:
URL: https://github.com/apache/beam/issues/29971
### What happened?
Hi,
I am trying to read several topics from Kafka using a pipeline like the
following. The pipeline runs on Google Dataflow.
```
// Define pipeline
pipeline
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopics(KafkaUtils.getKafkaTopics(options.getInputTopics()))
.withCreateTime(Duration.standardMinutes(options.getMaxDelayInMinutes()))
.withReadCommitted()
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(KafkaUtils.parseConsumerConfig(options,
secretService)))
.apply(
"CreateWindows",
Window.<KafkaRecord<String,
String>>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(10)))))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(1))
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY))
.apply("WriteFileOutput",
createWriteGenericRecordTransform(options));
```
The pipeline should write out KafkaRecord per in windows per hour and also
handle late data. For small Kafka topic everything seems to work. However if
the topic is rather large (offset of ~5million, with 5GB size) the pipeline
seems to hang and nothing happens.
Is there something wrong with my implemenation. If I adjust my window to
something like this using early panes, everything seems to work but it does not
feel correct:
```
Window.<T>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.alignedTo(Duration.standardHours(1)))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(10))))
.discardingFiredPanes()
.withAllowedLateness(allowedLateness())
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY));
```
Any help would be greatly appreciated.
### Issue Priority
Priority: 1 (data loss / total loss of function)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]