This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7aa51ea1bf856be429edb8ba7c9b7dfc137a785c Author: Stephan Ewen <[email protected]> AuthorDate: Wed Aug 26 15:50:16 2020 +0200 [FLINK-19205][core] Add access to configuration and hostname in the SourceReaderContext --- .../api/connector/source/SourceReaderContext.java | 12 ++++++++++ .../streaming/api/operators/SourceOperator.java | 28 ++++++++++++++++++---- .../api/operators/SourceOperatorFactory.java | 13 +++++++--- .../operators/source/TestingSourceOperator.java | 5 +++- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java index 658034b..c8ed36d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java @@ -19,6 +19,7 @@ package org.apache.flink.api.connector.source; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; /** @@ -33,6 +34,17 @@ public interface SourceReaderContext { MetricGroup metricGroup(); /** + * Gets the configuration with which Flink was started. + */ + Configuration getConfiguration(); + + /** + * Gets the hostname of the machine where this reader is executed. This can be used + * to request splits local to the machine, if needed. + */ + String getLocalHostName(); + + /** * Send a source event to the source coordinator. * * @param sourceEvent the source event to coordinator. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 33e95f2..1af572c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; @@ -65,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <OUT> The output type of the operator. */ @Internal -@SuppressWarnings("serial") +//@SuppressWarnings("serial") public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT> implements OperatorEventHandler, PushingAsyncDataInput<OUT> { @@ -89,6 +90,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> /** The factory for timestamps and watermark generators. */ private final WatermarkStrategy<OUT> watermarkStrategy; + /** The Flink configuration. */ + private final Configuration configuration; + + /** Host name of the machine where the operator runs, to support locality aware work assignment. */ + private final String localHostname; + // ---- lazily initialized fields (these fields are the "hot" fields) ---- /** The source reader that does most of the work. */ @@ -110,13 +117,17 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<SplitT> splitSerializer, WatermarkStrategy<OUT> watermarkStrategy, - ProcessingTimeService timeService) { + ProcessingTimeService timeService, + Configuration configuration, + String localHostname) { this.readerFactory = checkNotNull(readerFactory); this.operatorEventGateway = checkNotNull(operatorEventGateway); this.splitSerializer = checkNotNull(splitSerializer); this.watermarkStrategy = checkNotNull(watermarkStrategy); this.processingTimeService = timeService; + this.configuration = checkNotNull(configuration); + this.localHostname = checkNotNull(localHostname); } @Override @@ -130,6 +141,16 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> } @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public String getLocalHostName() { + return localHostname; + } + + @Override public void sendSourceEventToCoordinator(SourceEvent event) { operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event)); } @@ -219,8 +240,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> private void registerReader() { operatorEventGateway.sendEventToCoordinator(new ReaderRegistrationEvent( - getRuntimeContext().getIndexOfThisSubtask(), - "UNKNOWN_LOCATION")); + getRuntimeContext().getIndexOfThisSubtask(), localHostname)); } // --------------- methods for unit tests ------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index d1636cd..1d7aed7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; @@ -75,7 +76,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU gateway, source.getSplitSerializer(), watermarkStrategy, - parameters.getProcessingTimeService()); + parameters.getProcessingTimeService(), + parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), + parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getTaskManagerExternalAddress()); sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator); @@ -115,7 +118,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU OperatorEventGateway eventGateway, SimpleVersionedSerializer<?> splitSerializer, WatermarkStrategy<T> watermarkStrategy, - ProcessingTimeService timeService) { + ProcessingTimeService timeService, + Configuration config, + String localHostName) { // jumping through generics hoops: cast the generics away to then cast them back more strictly typed final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory = @@ -128,6 +133,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU eventGateway, typedSplitSerializer, watermarkStrategy, - timeService); + timeService, + config, + localHostName); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 038a211..a133ace 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; @@ -71,7 +72,9 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit eventGateway, new MockSourceSplitSerializer(), watermarkStrategy, - timeService); + timeService, + new Configuration(), + "localhost"); this.subtaskIndex = subtaskIndex; this.parallelism = parallelism;
