tomstepp commented on code in PR #37510:
URL: https://github.com/apache/beam/pull/37510#discussion_r2817706668
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -611,37 +615,61 @@ private void consumerPollLoop() {
private void commitCheckpointMark() {
KafkaCheckpointMark checkpointMark =
finalizedCheckpointMark.getAndSet(null);
- if (checkpointMark != null) {
- LOG.debug("{}: Committing finalized checkpoint {}", this,
checkpointMark);
- Consumer<byte[], byte[]> consumer =
Preconditions.checkStateNotNull(this.consumer);
- Instant now = Instant.now();
+ if (checkpointMark == null) {
+ return;
+ }
- try {
- consumer.commitSync(
- checkpointMark.getPartitions().stream()
- .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
- .collect(
- Collectors.toMap(
- p -> new TopicPartition(p.getTopic(),
p.getPartition()),
- p -> new OffsetAndMetadata(p.getNextOffset()))));
- nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
- } catch (Exception e) {
- // Log but ignore the exception. Committing consumer offsets to Kafka
is not critical for
- // KafkaIO because it relies on the offsets stored in
KafkaCheckpointMark.
- if (now.isAfter(nextAllowedCommitFailLogTime)) {
- LOG.warn(
- String.format(
- "%s: Did not successfully commit finalized checkpoint for >
%s. Current checkpoint: %s",
- this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
- e);
- nextAllowedCommitFailLogTime =
now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
- } else {
- LOG.info(
- String.format(
- "%s: Could not commit finalized checkpoint. Commit will be
retried with subsequent reads. Current checkpoint: %s",
- this, checkpointMark),
- e);
+ LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
+ Consumer<byte[], byte[]> consumer =
Preconditions.checkStateNotNull(this.consumer);
+ Instant now = Instant.now();
+
+ try {
+ // Commit only partitions whose offsets have advanced since the last
successful commit
+ // for this reader. This suppresses no-op commits for idle partitions.
+ Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
+ for (KafkaCheckpointMark.PartitionMark p :
checkpointMark.getPartitions()) {
+ long next = p.getNextOffset();
+ if (next == UNINITIALIZED_OFFSET) {
+ continue;
}
+
+ TopicPartition tp = new TopicPartition(p.getTopic(), p.getPartition());
+ Long prev = lastCommittedOffsets.get(tp);
+
+ if (prev == null || next > prev) {
+ toCommit.put(tp, new OffsetAndMetadata(next));
+ }
+ }
+
Review Comment:
Maybe we could we add a debug log for # idle partitions?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -79,6 +80,9 @@
*/
class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
+ // Track last successfully committed offsets to suppress no-op commits for
idle partitions.
+ private final Map<TopicPartition, Long> lastCommittedOffsets = new
HashMap<>();
Review Comment:
Maybe we can also track last commit time per partition? We could try to
commit if idle for more than some time (10 minutes for example).
I think this could also help for cases where customers may use time lag
monitoring (tracking time since last commit).
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
Review Comment:
Can we add a unit test to cover this new behavior?
From a quick search maybe reuse or similar to
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]