This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7aa51ea1bf856be429edb8ba7c9b7dfc137a785c
Author: Stephan Ewen <[email protected]>
AuthorDate: Wed Aug 26 15:50:16 2020 +0200

    [FLINK-19205][core] Add access to configuration and hostname in the 
SourceReaderContext
---
 .../api/connector/source/SourceReaderContext.java  | 12 ++++++++++
 .../streaming/api/operators/SourceOperator.java    | 28 ++++++++++++++++++----
 .../api/operators/SourceOperatorFactory.java       | 13 +++++++---
 .../operators/source/TestingSourceOperator.java    |  5 +++-
 4 files changed, 50 insertions(+), 8 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 658034b..c8ed36d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 
 /**
@@ -33,6 +34,17 @@ public interface SourceReaderContext {
        MetricGroup metricGroup();
 
        /**
+        * Gets the configuration with which Flink was started.
+        */
+       Configuration getConfiguration();
+
+       /**
+        * Gets the hostname of the machine where this reader is executed. This 
can be used
+        * to request splits local to the machine, if needed.
+        */
+       String getLocalHostName();
+
+       /**
         * Send a source event to the source coordinator.
         *
         * @param sourceEvent the source event to coordinator.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 33e95f2..1af572c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
@@ -65,7 +66,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <OUT> The output type of the operator.
  */
 @Internal
-@SuppressWarnings("serial")
+//@SuppressWarnings("serial")
 public class SourceOperator<OUT, SplitT extends SourceSplit>
                extends AbstractStreamOperator<OUT>
                implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
@@ -89,6 +90,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
        /** The factory for timestamps and watermark generators. */
        private final WatermarkStrategy<OUT> watermarkStrategy;
 
+       /** The Flink configuration. */
+       private final Configuration configuration;
+
+       /** Host name of the machine where the operator runs, to support 
locality aware work assignment. */
+       private final String localHostname;
+
        // ---- lazily initialized fields (these fields are the "hot" fields) 
----
 
        /** The source reader that does most of the work. */
@@ -110,13 +117,17 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        OperatorEventGateway operatorEventGateway,
                        SimpleVersionedSerializer<SplitT> splitSerializer,
                        WatermarkStrategy<OUT> watermarkStrategy,
-                       ProcessingTimeService timeService) {
+                       ProcessingTimeService timeService,
+                       Configuration configuration,
+                       String localHostname) {
 
                this.readerFactory = checkNotNull(readerFactory);
                this.operatorEventGateway = checkNotNull(operatorEventGateway);
                this.splitSerializer = checkNotNull(splitSerializer);
                this.watermarkStrategy = checkNotNull(watermarkStrategy);
                this.processingTimeService = timeService;
+               this.configuration = checkNotNull(configuration);
+               this.localHostname = checkNotNull(localHostname);
        }
 
        @Override
@@ -130,6 +141,16 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        }
 
                        @Override
+                       public Configuration getConfiguration() {
+                               return configuration;
+                       }
+
+                       @Override
+                       public String getLocalHostName() {
+                               return localHostname;
+                       }
+
+                       @Override
                        public void sendSourceEventToCoordinator(SourceEvent 
event) {
                                operatorEventGateway.sendEventToCoordinator(new 
SourceEventWrapper(event));
                        }
@@ -219,8 +240,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 
        private void registerReader() {
                operatorEventGateway.sendEventToCoordinator(new 
ReaderRegistrationEvent(
-                               getRuntimeContext().getIndexOfThisSubtask(),
-                               "UNKNOWN_LOCATION"));
+                               getRuntimeContext().getIndexOfThisSubtask(), 
localHostname));
        }
 
        // --------------- methods for unit tests ------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index d1636cd..1d7aed7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -75,7 +76,9 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                                gateway,
                                source.getSplitSerializer(),
                                watermarkStrategy,
-                               parameters.getProcessingTimeService());
+                               parameters.getProcessingTimeService(),
+                               
parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+                               
parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getTaskManagerExternalAddress());
 
                sourceOperator.setup(parameters.getContainingTask(), 
parameters.getStreamConfig(), parameters.getOutput());
                
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, 
sourceOperator);
@@ -115,7 +118,9 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                        OperatorEventGateway eventGateway,
                        SimpleVersionedSerializer<?> splitSerializer,
                        WatermarkStrategy<T> watermarkStrategy,
-                       ProcessingTimeService timeService) {
+                       ProcessingTimeService timeService,
+                       Configuration config,
+                       String localHostName) {
 
                // jumping through generics hoops: cast the generics away to 
then cast them back more strictly typed
                final Function<SourceReaderContext, SourceReader<T, SplitT>> 
typedReaderFactory =
@@ -128,6 +133,8 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
                                eventGateway,
                                typedSplitSerializer,
                                watermarkStrategy,
-                               timeService);
+                               timeService,
+                               config,
+                               localHostName);
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 038a211..a133ace 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -71,7 +72,9 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
                        eventGateway,
                        new MockSourceSplitSerializer(),
                        watermarkStrategy,
-                       timeService);
+                       timeService,
+                       new Configuration(),
+                       "localhost");
 
                this.subtaskIndex = subtaskIndex;
                this.parallelism = parallelism;

Reply via email to