This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cb4c32f35987fdfc7e22a789f8888f13c0afb484
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed Mar 4 17:34:30 2020 +0100

    [FLINK-16316][operators] Pass StreamTaskStateInitializer to operators from 
outside
    
    This removes another dependency on the StreamTask from 
AbstractStreamOperator
---
 .../apache/flink/state/api/output/BoundedStreamTask.java   |  2 +-
 .../apache/flink/state/api/output/SnapshotUtilsTest.java   |  3 ++-
 .../streaming/api/operators/AbstractStreamOperator.java    |  4 +---
 .../flink/streaming/api/operators/StreamOperator.java      |  2 +-
 .../flink/streaming/runtime/tasks/OperatorChain.java       | 14 +++++++-------
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  2 +-
 .../operators/AbstractUdfStreamOperatorLifecycleTest.java  |  2 +-
 .../streaming/util/AbstractStreamOperatorTestHarness.java  |  2 +-
 .../runtime/StreamTaskSelectiveReadingITCase.java          |  3 +--
 9 files changed, 16 insertions(+), 18 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index 7a801a4..2aacaa9 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -81,7 +81,7 @@ class BoundedStreamTask<IN, OUT, OP extends 
OneInputStreamOperator<IN, OUT> & Bo
                                configuration,
                                new CollectorWrapper<>(collector));
                headOperator = headOperatorAndTimeService.f0;
-               headOperator.initializeState();
+               
headOperator.initializeState(createStreamTaskStateInitializer());
                headOperator.open();
        }
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index de1f146..35266ef 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.junit.Assert;
@@ -98,7 +99,7 @@ public class SnapshotUtilsTest {
                }
 
                @Override
-               public void initializeState() throws Exception {
+               public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager) throws Exception {
                        ACTUAL_ORDER_TRACKING.add("initializeState");
                }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 2f8d0a1..98f743f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -248,7 +248,7 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        @Override
-       public final void initializeState() throws Exception {
+       public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager) throws Exception {
 
                final TypeSerializer<?> keySerializer = 
config.getStateKeySerializer(getUserCodeClassloader());
 
@@ -256,8 +256,6 @@ public abstract class AbstractStreamOperator<OUT>
                        Preconditions.checkNotNull(getContainingTask());
                final CloseableRegistry streamTaskCloseableRegistry =
                        
Preconditions.checkNotNull(containingTask.getCancelables());
-               final StreamTaskStateInitializer streamTaskStateManager =
-                       
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
 
                final StreamOperatorStateContext context =
                        streamTaskStateManager.streamOperatorStateContext(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f5be378..69a5056 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -127,7 +127,7 @@ public interface StreamOperator<OUT> extends 
CheckpointListener, KeyContext, Dis
        /**
         * Provides a context to initialize all state in the operator.
         */
-       void initializeState() throws Exception;
+       void initializeState(StreamTaskStateInitializer streamTaskStateManager) 
throws Exception;
 
        // 
------------------------------------------------------------------------
        //  miscellaneous
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 52c0174..3e1e3f0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
@@ -282,15 +283,14 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
        }
 
        /**
-        * Executes {@link StreamOperator#initializeState()} followed by {@link 
StreamOperator#open()}
-        * of each operator in the chain of this {@link StreamTask}. State 
initialization and opening
-        * happens from <b>tail to head</b> operator in the chain, contrary to 
{@link StreamOperator#close()}
-        * which happens <b>head to tail</b>(see {@link 
#closeOperators(StreamTaskActionExecutor)}).
+        * Initialize state and open all operators in the chain from <b>tail to 
head</b>,
+        * contrary to {@link StreamOperator#close()} which happens <b>head to 
tail</b>
+        * (see {@link #closeOperators(StreamTaskActionExecutor)}).
         */
-       protected void initializeStateAndOpenOperators() throws Exception {
+       protected void 
initializeStateAndOpenOperators(StreamTaskStateInitializer 
streamTaskStateInitializer) throws Exception {
                for (StreamOperatorWrapper<?, ?> operatorWrapper : 
getAllOperators(true)) {
                        StreamOperator<?> operator = 
operatorWrapper.getStreamOperator();
-                       operator.initializeState();
+                       operator.initializeState(streamTaskStateInitializer);
                        operator.open();
                }
        }
@@ -298,7 +298,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
        /**
         * Closes all operators in a chain effect way. Closing happens from 
<b>head to tail</b> operator
         * in the chain, contrary to {@link StreamOperator#open()} which 
happens <b>tail to head</b>
-        * (see {@link #initializeStateAndOpenOperators()}).
+        * (see {@link 
#initializeStateAndOpenOperators(StreamTaskStateInitializer)}).
         */
        protected void closeOperators(StreamTaskActionExecutor actionExecutor) 
throws Exception {
                if (headOperatorWrapper != null) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index f2972cb..d3a79b6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -433,7 +433,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // both the following operations are protected by the 
lock
                        // so that we avoid race conditions in the case that 
initializeState()
                        // registers a timer, that fires before the open() is 
called.
-                       operatorChain.initializeStateAndOpenOperators();
+                       
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
                });
 
                isRunning = true;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index ea91bd0..5fb8830 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -92,7 +92,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                        "getCurrentKey[], " +
                        "getMetricGroup[], " +
                        "getOperatorID[], " +
-                       "initializeState[], " +
+                       "initializeState[interface 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializer], " +
                        "notifyCheckpointComplete[long], " +
                        "open[], " +
                        "prepareSnapshotPreBarrier[long], " +
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7d0cd51..13624e6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -495,7 +495,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                        }
                }
 
-               operator.initializeState();
+               
operator.initializeState(mockTask.createStreamTaskStateInitializer());
                initializeCalled = true;
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java
index 28840d9..7fa49f8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.util.TestSequentialReadingStreamOperator;
 import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
 
@@ -60,7 +59,7 @@ public class StreamTaskSelectiveReadingITCase {
                        .setParallelism(2);
                TestListResultSink<String> resultSink = new 
TestListResultSink<>();
 
-               TwoInputStreamOperator<String, Integer, String> 
twoInputStreamOperator = new TestSequentialReadingStreamOperator("Operator0");
+               TestSequentialReadingStreamOperator twoInputStreamOperator = 
new TestSequentialReadingStreamOperator("Operator0");
                
twoInputStreamOperator.setChainingStrategy(ChainingStrategy.NEVER);
 
                source0.connect(source1)

Reply via email to