This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1131e0582b5a50d48ae64cb2120e304b2aac9bcb
Author: fangliang <[email protected]>
AuthorDate: Wed Oct 28 09:08:38 2020 +0800

    [FLINK-18044][Connectors/Common] Add the subtask index information to the 
SourceReaderContext. (#12647)
---
 .../org/apache/flink/api/connector/source/SourceReaderContext.java   | 5 +++++
 .../flink/api/connector/source/mocks/TestingReaderContext.java       | 5 +++++
 .../org/apache/flink/streaming/api/operators/SourceOperator.java     | 5 +++++
 3 files changed, 15 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index c8ed36d..4d9e53a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -45,6 +45,11 @@ public interface SourceReaderContext {
        String getLocalHostName();
 
        /**
+        * @return The index of this subtask.
+        */
+       int getIndexOfSubtask();
+
+       /**
         * Send a source event to the source coordinator.
         *
         * @param sourceEvent the source event to coordinator.
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
index cee4654..3f254e4 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
@@ -64,6 +64,11 @@ public class TestingReaderContext implements 
SourceReaderContext {
        }
 
        @Override
+       public int getIndexOfSubtask() {
+               return 0;
+       }
+
+       @Override
        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
                sentEvents.add(sourceEvent);
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 05b3216..d925b23 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -151,6 +151,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        }
 
                        @Override
+                       public int getIndexOfSubtask() {
+                               return 
getRuntimeContext().getIndexOfThisSubtask();
+                       }
+
+                       @Override
                        public void sendSourceEventToCoordinator(SourceEvent 
event) {
                                operatorEventGateway.sendEventToCoordinator(new 
SourceEventWrapper(event));
                        }

Reply via email to