[ https://issues.apache.org/jira/browse/FLINK-34400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815196#comment-17815196 ]
Alexis Sarda-Espinosa commented on FLINK-34400: ----------------------------------------------- Hi [~fanrui], yes I'm testing Flink 1.18.1 version right now. I also saw that exception in the logs, but I definitely find it odd. If I disable alignment, the Flink job works without issues. Additionally, the source has a parallelism of 2, and only 1 of the instances shows issues, the other one continues consuming and committing offsets normally (see the attached graph which corresponds to Strimzi metrics, the yellow line is for the second topic in the alignment group that remains empty). Also, we have a lot of components consuming from different topics in the same Kafka cluster, and only the Flink job shows issues when alignment is enabled. !alignment_lags.png! I unfortunately can't share the whole code, but we're not doing anything special with the Kafka connector, both sources are instantiated like this: {noformat} KafkaSource.<GenericChangeMessage>builder() .setBootstrapServers(config.internalBroker) .setTopics(config.someTopic) .setGroupId(config.internalBrokerGroupId) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setDeserializer(new ...Deserializer()) // this changes between sources of course .setProperties(config.getInternalBrokerProperties()) .setClientIdPrefix("...") .build() {noformat} and it really seems to be a problem at the source - if I look at the {{numRecordsOutPerSecond}} metric, it's the source itself that reports 0/s (for once of the parallel streams). In case it's relevant, the watermark strategy is either {{noWatermarks}} or {noformat} WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift) // 1 second .withWatermarkAlignment(alignmentGroup, maxAllowedWatermarkDrift, Duration.ofSeconds(1L)) {noformat} Out Kafka cluster has 3 brokers and 30 partitions per topic. I think that if I add {{.withIdleness(...)}} to the watermark strategy, it also works fine, but I will do some more testing. > Kafka sources with watermark alignment sporadically stop consuming > ------------------------------------------------------------------ > > Key: FLINK-34400 > URL: https://issues.apache.org/jira/browse/FLINK-34400 > Project: Flink > Issue Type: Bug > Affects Versions: 1.18.1 > Reporter: Alexis Sarda-Espinosa > Priority: Major > Attachments: alignment_lags.png, logs.txt > > > I have 2 Kafka sources that read from different topics. I have assigned them > to the same watermark alignment group, and I have _not_ enabled idleness > explicitly in their watermark strategies. One topic remains pretty much empty > most of the time, while the other receives a few events per second all the > time. Parallelism of the active source is 2, for the other one it's 1, and > checkpoints are once every minute. > This works correctly for some time (10 - 15 minutes in my case) but then 1 of > the active sources stops consuming, which causes lag to increase. Weirdly, > after another 15 minutes or so, all the backlog is consumed at once, and then > everything stops again. > I'm attaching some logs from the Task Manager where the issue appears. You > will notice that the Kafka network client reports disconnections (a long time > after the deserializer stopped reporting that events were being consumed), > I'm not sure if this is related. -- This message was sent by Atlassian Jira (v8.20.10#820010)