This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cac1bb51f3f7a39b71cfc016e013e45ad6a1b0f9
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Nov 23 21:26:52 2020 +0100

    [FLINK-20270][refactor] Initialize reader in SourceOperator at an earlier 
point.
    
    That way we can access the reader during task setup and use it and its 
properties
    during initialization of the SourceStreamTask.
---
 .../streaming/api/operators/SourceOperator.java    | 41 +++++++++++++++++-----
 .../runtime/tasks/SourceOperatorStreamTask.java    |  9 ++++-
 .../api/operators/SourceOperatorTest.java          |  4 +--
 .../operators/source/TestingSourceOperator.java    | 18 ++++------
 4 files changed, 47 insertions(+), 25 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index c0e6e21..b4d6db4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -131,9 +131,27 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                this.localHostname = checkNotNull(localHostname);
        }
 
-       @Override
-       public void open() throws Exception {
+       /**
+        * Initializes the reader. The code from this method should ideally 
happen in the
+        * constructor or in the operator factory even. It has to happen here 
at a slightly
+        * later stage, because of the lazy metric initialization.
+        *
+        * <p>Calling this method explicitly is an optional way to have the 
reader
+        * initialization a bit earlier than in open(), as needed by the
+        * {@link 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask}
+        *
+        * <p>This code should move to the constructor once the metric groups 
are available
+        * at task setup time.
+        */
+       public void initReader() throws Exception {
+               if (sourceReader != null) {
+                       return;
+               }
+
                final MetricGroup metricGroup = getMetricGroup();
+               assert metricGroup != null;
+
+               final int subtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
 
                final SourceReaderContext context = new SourceReaderContext() {
                        @Override
@@ -153,7 +171,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 
                        @Override
                        public int getIndexOfSubtask() {
-                               return 
getRuntimeContext().getIndexOfThisSubtask();
+                               return subtaskIndex;
                        }
 
                        @Override
@@ -167,16 +185,21 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        }
                };
 
+               sourceReader = readerFactory.apply(context);
+       }
+
+       @Override
+       public void open() throws Exception {
+               initReader();
+
                // in the future when we support both batch and streaming modes 
for the source operator,
                // and when this one is migrated to the "eager initialization" 
operator (StreamOperatorV2),
                // then we should evaluate this during operator construction.
                eventTimeLogic = 
TimestampsAndWatermarks.createStreamingEventTimeLogic(
-                               watermarkStrategy,
-                               metricGroup,
-                               getProcessingTimeService(),
-                               
getExecutionConfig().getAutoWatermarkInterval());
-
-               sourceReader = readerFactory.apply(context);
+                       watermarkStrategy,
+                       getMetricGroup(),
+                       getProcessingTimeService(),
+                       getExecutionConfig().getAutoWatermarkInterval());
 
                // restore the state if necessary.
                final List<SplitT> splits = 
CollectionUtil.iterableToList(readerState.get());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index f32f7e3f..bd4fe74 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -46,7 +46,14 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
        }
 
        @Override
-       public void init() {
+       public void init() throws Exception {
+               final SourceOperator<T, ?> sourceOperator = headOperator;
+               // reader initialization, which cannot happen in the 
constructor due to the
+               // lazy metric group initialization. We do this here now, 
rather than
+               // later (in open()) so that we can access the reader when 
setting up the
+               // input processors
+               sourceOperator.initReader();
+
                StreamTaskInput<T> input = new 
StreamTaskSourceInput<>(headOperator);
                output = new AsyncDataOutputToOutput<>(
                        operatorChain.getChainEntryPoint(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 80b9646..c0ce943 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -91,9 +91,7 @@ public class SourceOperatorTest {
        @After
        public void cleanUp() throws Exception {
                operator.close();
-               if (((TestingSourceOperator<Integer>) 
operator).isReaderCreated()) {
-                       assertTrue(mockSourceReader.isClosed());
-               }
+               assertTrue(mockSourceReader.isClosed());
        }
 
        @Test
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 14b94eb..b5a3106 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -43,8 +43,6 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
        private final int subtaskIndex;
        private final int parallelism;
 
-       private volatile boolean readerCreated;
-
        public TestingSourceOperator(
                        SourceReader<T, MockSourceSplit> reader,
                        WatermarkStrategy<T> watermarkStrategy,
@@ -81,13 +79,13 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
                this.subtaskIndex = subtaskIndex;
                this.parallelism = parallelism;
                this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
-               this.readerCreated = false;
-       }
 
-       @Override
-       public void open() throws Exception {
-               super.open();
-               readerCreated = true;
+               // unchecked wrapping is okay to keep tests simpler
+               try {
+                       initReader();
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
        }
 
        @Override
@@ -102,8 +100,4 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
                cfg.setAutoWatermarkInterval(100);
                return cfg;
        }
-
-       public boolean isReaderCreated() {
-               return readerCreated;
-       }
 }

Reply via email to