This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2cdd3f05ba81bd5be9f4d3161a6d3ab47386470a Author: Arvid Heise <[email protected]> AuthorDate: Thu Sep 5 13:24:03 2024 +0200 [FLINK-25920] Improve logging in committable handling of the sink --- .../runtime/operators/sink/SinkWriterOperator.java | 9 +++++++-- .../CheckpointCommittableManagerImpl.java | 12 ++++++++++++ .../sink/committables/SubtaskCommittableManager.java | 20 ++++++++++++++++++++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 35d6ca6f7ad..f2b6a533bcd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -289,7 +289,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab int numberOfParallelSubtasks, long checkpointId, Collection<CommT> committables) { - output.collect( + emit( new StreamRecord<>( new CommittableSummary<>( indexOfThisSubtask, @@ -299,13 +299,18 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab committables.size(), 0))); for (CommT committable : committables) { - output.collect( + emit( new StreamRecord<>( new CommittableWithLineage<>( committable, checkpointId, indexOfThisSubtask))); } } + private void emit(StreamRecord<CommittableMessage<CommT>> message) { + LOG.debug("Sending message to committer: {}", message); + output.collect(message); + } + private WriterInitContext createInitContext(OptionalLong restoredCheckpointId) { return new InitContextImpl( getRuntimeContext(), diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index a217116055c..9cbe0b16eaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -24,6 +24,9 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -43,6 +46,9 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa private final int numberOfSubtasks; private final SinkCommitterMetricGroup metricGroup; + private static final Logger LOG = + LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class); + CheckpointCommittableManagerImpl( int subtaskId, int numberOfSubtasks, @@ -82,6 +88,7 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa SubtaskCommittableManager<CommT> merged = subtasksCommittableManagers.merge( summary.getSubtaskId(), manager, SubtaskCommittableManager::merge); + LOG.debug("Adding EOI summary (new={}}, merged={}}).", manager, merged); } else { SubtaskCommittableManager<CommT> existing = subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager); @@ -90,6 +97,11 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa String.format( "Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920", checkpointId, summary.getSubtaskId(), manager, existing)); + } else { + LOG.debug( + "Setting the summary for checkpointId {} with {}", + this.checkpointId, + manager); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java index a459930ad68..2421e6fb234 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java @@ -209,4 +209,24 @@ class SubtaskCommittableManager<CommT> { checkpointId, metricGroup); } + + @Override + public String toString() { + return "SubtaskCommittableManager{" + + "requests=" + + requests + + ", numExpectedCommittables=" + + numExpectedCommittables + + ", checkpointId=" + + checkpointId + + ", subtaskId=" + + subtaskId + + ", numDrained=" + + numDrained + + ", numFailed=" + + numFailed + + ", metricGroup=" + + metricGroup + + '}'; + } }
