This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1131e0582b5a50d48ae64cb2120e304b2aac9bcb Author: fangliang <[email protected]> AuthorDate: Wed Oct 28 09:08:38 2020 +0800 [FLINK-18044][Connectors/Common] Add the subtask index information to the SourceReaderContext. (#12647) --- .../org/apache/flink/api/connector/source/SourceReaderContext.java | 5 +++++ .../flink/api/connector/source/mocks/TestingReaderContext.java | 5 +++++ .../org/apache/flink/streaming/api/operators/SourceOperator.java | 5 +++++ 3 files changed, 15 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java index c8ed36d..4d9e53a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java @@ -45,6 +45,11 @@ public interface SourceReaderContext { String getLocalHostName(); /** + * @return The index of this subtask. + */ + int getIndexOfSubtask(); + + /** * Send a source event to the source coordinator. * * @param sourceEvent the source event to coordinator. diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java index cee4654..3f254e4 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java @@ -64,6 +64,11 @@ public class TestingReaderContext implements SourceReaderContext { } @Override + public int getIndexOfSubtask() { + return 0; + } + + @Override public void sendSourceEventToCoordinator(SourceEvent sourceEvent) { sentEvents.add(sourceEvent); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 05b3216..d925b23 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -151,6 +151,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> } @Override + public int getIndexOfSubtask() { + return getRuntimeContext().getIndexOfThisSubtask(); + } + + @Override public void sendSourceEventToCoordinator(SourceEvent event) { operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event)); }
