This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit cac1bb51f3f7a39b71cfc016e013e45ad6a1b0f9 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Nov 23 21:26:52 2020 +0100 [FLINK-20270][refactor] Initialize reader in SourceOperator at an earlier point. That way we can access the reader during task setup and use it and its properties during initialization of the SourceStreamTask. --- .../streaming/api/operators/SourceOperator.java | 41 +++++++++++++++++----- .../runtime/tasks/SourceOperatorStreamTask.java | 9 ++++- .../api/operators/SourceOperatorTest.java | 4 +-- .../operators/source/TestingSourceOperator.java | 18 ++++------ 4 files changed, 47 insertions(+), 25 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index c0e6e21..b4d6db4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -131,9 +131,27 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> this.localHostname = checkNotNull(localHostname); } - @Override - public void open() throws Exception { + /** + * Initializes the reader. The code from this method should ideally happen in the + * constructor or in the operator factory even. It has to happen here at a slightly + * later stage, because of the lazy metric initialization. + * + * <p>Calling this method explicitly is an optional way to have the reader + * initialization a bit earlier than in open(), as needed by the + * {@link org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask} + * + * <p>This code should move to the constructor once the metric groups are available + * at task setup time. + */ + public void initReader() throws Exception { + if (sourceReader != null) { + return; + } + final MetricGroup metricGroup = getMetricGroup(); + assert metricGroup != null; + + final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); final SourceReaderContext context = new SourceReaderContext() { @Override @@ -153,7 +171,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> @Override public int getIndexOfSubtask() { - return getRuntimeContext().getIndexOfThisSubtask(); + return subtaskIndex; } @Override @@ -167,16 +185,21 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> } }; + sourceReader = readerFactory.apply(context); + } + + @Override + public void open() throws Exception { + initReader(); + // in the future when we support both batch and streaming modes for the source operator, // and when this one is migrated to the "eager initialization" operator (StreamOperatorV2), // then we should evaluate this during operator construction. eventTimeLogic = TimestampsAndWatermarks.createStreamingEventTimeLogic( - watermarkStrategy, - metricGroup, - getProcessingTimeService(), - getExecutionConfig().getAutoWatermarkInterval()); - - sourceReader = readerFactory.apply(context); + watermarkStrategy, + getMetricGroup(), + getProcessingTimeService(), + getExecutionConfig().getAutoWatermarkInterval()); // restore the state if necessary. final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index f32f7e3f..bd4fe74 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -46,7 +46,14 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, } @Override - public void init() { + public void init() throws Exception { + final SourceOperator<T, ?> sourceOperator = headOperator; + // reader initialization, which cannot happen in the constructor due to the + // lazy metric group initialization. We do this here now, rather than + // later (in open()) so that we can access the reader when setting up the + // input processors + sourceOperator.initReader(); + StreamTaskInput<T> input = new StreamTaskSourceInput<>(headOperator); output = new AsyncDataOutputToOutput<>( operatorChain.getChainEntryPoint(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 80b9646..c0ce943 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -91,9 +91,7 @@ public class SourceOperatorTest { @After public void cleanUp() throws Exception { operator.close(); - if (((TestingSourceOperator<Integer>) operator).isReaderCreated()) { - assertTrue(mockSourceReader.isClosed()); - } + assertTrue(mockSourceReader.isClosed()); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 14b94eb..b5a3106 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -43,8 +43,6 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit private final int subtaskIndex; private final int parallelism; - private volatile boolean readerCreated; - public TestingSourceOperator( SourceReader<T, MockSourceSplit> reader, WatermarkStrategy<T> watermarkStrategy, @@ -81,13 +79,13 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); - this.readerCreated = false; - } - @Override - public void open() throws Exception { - super.open(); - readerCreated = true; + // unchecked wrapping is okay to keep tests simpler + try { + initReader(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override @@ -102,8 +100,4 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit cfg.setAutoWatermarkInterval(100); return cfg; } - - public boolean isReaderCreated() { - return readerCreated; - } }
