This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2cdd3f05ba81bd5be9f4d3161a6d3ab47386470a
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Sep 5 13:24:03 2024 +0200

    [FLINK-25920] Improve logging in committable handling of the sink
---
 .../runtime/operators/sink/SinkWriterOperator.java   |  9 +++++++--
 .../CheckpointCommittableManagerImpl.java            | 12 ++++++++++++
 .../sink/committables/SubtaskCommittableManager.java | 20 ++++++++++++++++++++
 3 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 35d6ca6f7ad..f2b6a533bcd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -289,7 +289,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
             int numberOfParallelSubtasks,
             long checkpointId,
             Collection<CommT> committables) {
-        output.collect(
+        emit(
                 new StreamRecord<>(
                         new CommittableSummary<>(
                                 indexOfThisSubtask,
@@ -299,13 +299,18 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
                                 committables.size(),
                                 0)));
         for (CommT committable : committables) {
-            output.collect(
+            emit(
                     new StreamRecord<>(
                             new CommittableWithLineage<>(
                                     committable, checkpointId, 
indexOfThisSubtask)));
         }
     }
 
+    private void emit(StreamRecord<CommittableMessage<CommT>> message) {
+        LOG.debug("Sending message to committer: {}", message);
+        output.collect(message);
+    }
+
     private WriterInitContext createInitContext(OptionalLong 
restoredCheckpointId) {
         return new InitContextImpl(
                 getRuntimeContext(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index a217116055c..9cbe0b16eaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -24,6 +24,9 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,6 +46,9 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     private final int numberOfSubtasks;
     private final SinkCommitterMetricGroup metricGroup;
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);
+
     CheckpointCommittableManagerImpl(
             int subtaskId,
             int numberOfSubtasks,
@@ -82,6 +88,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
             SubtaskCommittableManager<CommT> merged =
                     subtasksCommittableManagers.merge(
                             summary.getSubtaskId(), manager, 
SubtaskCommittableManager::merge);
+            LOG.debug("Adding EOI summary (new={}}, merged={}}).", manager, 
merged);
         } else {
             SubtaskCommittableManager<CommT> existing =
                     
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
@@ -90,6 +97,11 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
                         String.format(
                                 "Received duplicate committable summary for 
checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of 
FLINK-25920",
                                 checkpointId, summary.getSubtaskId(), manager, 
existing));
+            } else {
+                LOG.debug(
+                        "Setting the summary for checkpointId {} with {}",
+                        this.checkpointId,
+                        manager);
             }
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
index a459930ad68..2421e6fb234 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
@@ -209,4 +209,24 @@ class SubtaskCommittableManager<CommT> {
                 checkpointId,
                 metricGroup);
     }
+
+    @Override
+    public String toString() {
+        return "SubtaskCommittableManager{"
+                + "requests="
+                + requests
+                + ", numExpectedCommittables="
+                + numExpectedCommittables
+                + ", checkpointId="
+                + checkpointId
+                + ", subtaskId="
+                + subtaskId
+                + ", numDrained="
+                + numDrained
+                + ", numFailed="
+                + numFailed
+                + ", metricGroup="
+                + metricGroup
+                + '}';
+    }
 }

Reply via email to