This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 833c4d25c884a2657fdf4bbdf4127a44faede7ae Author: ifndef-SleePy <[email protected]> AuthorDate: Sun Jan 22 23:26:51 2023 +0800 [FLINK-30755][connector] Support getting attempt number from Sink context --- .../flink/connector/base/sink/writer/TestSinkInitContext.java | 5 +++++ .../org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java | 5 +++++ .../src/main/java/org/apache/flink/api/connector/sink2/Sink.java | 7 +++++++ .../flink/streaming/runtime/operators/sink/SinkWriterOperator.java | 5 +++++ .../org/apache/flink/streaming/api/functions/PrintSinkTest.java | 5 +++++ 5 files changed, 27 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java index b1461903fc8..3e7d1c159aa 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java @@ -122,6 +122,11 @@ public class TestSinkInitContext implements Sink.InitContext { return 0; } + @Override + public int getAttemptNumber() { + return 0; + } + @Override public SinkWriterMetricGroup metricGroup() { return metricGroup; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 8dfa5f6e534..98f64ed138f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -474,6 +474,11 @@ public class KafkaWriterITCase { return 1; } + @Override + public int getAttemptNumber() { + return 0; + } + @Override public SinkWriterMetricGroup metricGroup() { return metricGroup; diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java index c006ba5c12b..58bd1a1dd94 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java @@ -95,6 +95,13 @@ public interface Sink<InputT> extends Serializable { /** @return The number of parallel Sink tasks. */ int getNumberOfParallelSubtasks(); + /** + * Gets the attempt number of this parallel subtask. First attempt is numbered 0. + * + * @return Attempt number of the subtask. + */ + int getAttemptNumber(); + /** @return The metric group this writer belongs to. */ SinkWriterMetricGroup metricGroup(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index e593616abf7..7646584e28f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -313,6 +313,11 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab return runtimeContext.getNumberOfParallelSubtasks(); } + @Override + public int getAttemptNumber() { + return runtimeContext.getAttemptNumber(); + } + @Override public MailboxExecutor getMailboxExecutor() { return mailboxExecutor; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java index 2a86e20fcdd..69d8e4af968 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java @@ -191,6 +191,11 @@ class PrintSinkTest { return numSubtasks; } + @Override + public int getAttemptNumber() { + return 0; + } + @Override public SinkWriterMetricGroup metricGroup() { return InternalSinkWriterMetricGroup.mock(new UnregisteredMetricsGroup());
