This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b5b09bd2ff988498b3557e45dd194e46ec45b6b
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Fri Jun 14 17:04:09 2019 +0800

    [FLINK-12686][datastream] Remove StreamExecutionEnvironment from StreamNode
---
 .../flink/streaming/api/graph/StreamGraph.java     |  2 +-
 .../streaming/api/graph/StreamGraphGenerator.java  | 20 +++++++++++++----
 .../flink/streaming/api/graph/StreamNode.java      | 25 +++++++---------------
 .../api/graph/StreamGraphGeneratorTest.java        |  8 +++----
 .../runtime/tasks/OneInputStreamTaskTest.java      |  2 --
 .../runtime/tasks/StreamConfigChainer.java         |  8 +++----
 .../runtime/tasks/StreamTaskTestHarness.java       |  4 ++--
 .../tasks/TwoInputStreamTaskTestHarness.java       |  4 ++--
 8 files changed, 37 insertions(+), 36 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 4cc18eb..3f438d9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -260,7 +260,7 @@ public class StreamGraph extends StreamingPlan {
                        throw new RuntimeException("Duplicate vertexID " + 
vertexID);
                }
 
-               StreamNode vertex = new StreamNode(environment,
+               StreamNode vertex = new StreamNode(
                        vertexID,
                        slotSharingGroup,
                        coLocationGroup,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 792b719..fc574a0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -197,7 +198,10 @@ public class StreamGraphGenerator {
 
                if (transform.getBufferTimeout() >= 0) {
                        streamGraph.setBufferTimeout(transform.getId(), 
transform.getBufferTimeout());
+               } else {
+                       streamGraph.setBufferTimeout(transform.getId(), 
env.getBufferTimeout());
                }
+
                if (transform.getUid() != null) {
                        streamGraph.setTransformationUID(transform.getId(), 
transform.getUid());
                }
@@ -490,7 +494,9 @@ public class StreamGraphGenerator {
                        streamGraph.setInputFormat(source.getId(),
                                        ((InputFormatOperatorFactory<T>) 
source.getOperatorFactory()).getInputFormat());
                }
-               streamGraph.setParallelism(source.getId(), 
source.getParallelism());
+               int parallelism = source.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
+                       source.getParallelism() : env.getParallelism();
+               streamGraph.setParallelism(source.getId(), parallelism);
                streamGraph.setMaxParallelism(source.getId(), 
source.getMaxParallelism());
                return Collections.singleton(source.getId());
        }
@@ -517,7 +523,9 @@ public class StreamGraphGenerator {
                        streamGraph.setOutputFormat(sink.getId(), 
((OutputFormatOperatorFactory) operatorFactory).getOutputFormat());
                }
 
-               streamGraph.setParallelism(sink.getId(), sink.getParallelism());
+               int parallelism = sink.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
+                       sink.getParallelism() : env.getParallelism();
+               streamGraph.setParallelism(sink.getId(), parallelism);
                streamGraph.setMaxParallelism(sink.getId(), 
sink.getMaxParallelism());
 
                for (Integer inputId: inputIds) {
@@ -565,7 +573,9 @@ public class StreamGraphGenerator {
                        streamGraph.setOneInputStateKey(transform.getId(), 
transform.getStateKeySelector(), keySerializer);
                }
 
-               streamGraph.setParallelism(transform.getId(), 
transform.getParallelism());
+               int parallelism = transform.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
+                       transform.getParallelism() : env.getParallelism();
+               streamGraph.setParallelism(transform.getId(), parallelism);
                streamGraph.setMaxParallelism(transform.getId(), 
transform.getMaxParallelism());
 
                for (Integer inputId: inputIds) {
@@ -612,7 +622,9 @@ public class StreamGraphGenerator {
                        streamGraph.setTwoInputStateKey(transform.getId(), 
transform.getStateKeySelector1(), transform.getStateKeySelector2(), 
keySerializer);
                }
 
-               streamGraph.setParallelism(transform.getId(), 
transform.getParallelism());
+               int parallelism = transform.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
+                       transform.getParallelism() : env.getParallelism();
+               streamGraph.setParallelism(transform.getId(), parallelism);
                streamGraph.setMaxParallelism(transform.getId(), 
transform.getMaxParallelism());
 
                for (Integer inputId: inputIds1) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 579a605..e3cc498 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -27,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -46,10 +44,8 @@ public class StreamNode implements Serializable {
 
        private static final long serialVersionUID = 1L;
 
-       private transient StreamExecutionEnvironment env;
-
        private final int id;
-       private Integer parallelism = null;
+       private int parallelism;
        /**
         * Maximum parallelism for this stream node. The maximum parallelism is 
the upper limit for
         * dynamic scaling and the number of key groups used for partitioned 
state.
@@ -57,7 +53,7 @@ public class StreamNode implements Serializable {
        private int maxParallelism;
        private ResourceSpec minResources = ResourceSpec.DEFAULT;
        private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
-       private Long bufferTimeout = null;
+       private long bufferTimeout;
        private final String operatorName;
        private String slotSharingGroup;
        private @Nullable String coLocationGroup;
@@ -83,7 +79,7 @@ public class StreamNode implements Serializable {
        private String userHash;
 
        @VisibleForTesting
-       public StreamNode(StreamExecutionEnvironment env,
+       public StreamNode(
                        Integer id,
                        String slotSharingGroup,
                        @Nullable String coLocationGroup,
@@ -91,11 +87,11 @@ public class StreamNode implements Serializable {
                        String operatorName,
                        List<OutputSelector<?>> outputSelector,
                        Class<? extends AbstractInvokable> jobVertexClass) {
-               this(env, id, slotSharingGroup, coLocationGroup, 
SimpleOperatorFactory.of(operator),
+               this(id, slotSharingGroup, coLocationGroup, 
SimpleOperatorFactory.of(operator),
                                operatorName, outputSelector, jobVertexClass);
        }
 
-       public StreamNode(StreamExecutionEnvironment env,
+       public StreamNode(
                Integer id,
                String slotSharingGroup,
                @Nullable String coLocationGroup,
@@ -104,7 +100,6 @@ public class StreamNode implements Serializable {
                List<OutputSelector<?>> outputSelector,
                Class<? extends AbstractInvokable> jobVertexClass) {
 
-               this.env = env;
                this.id = id;
                this.operatorName = operatorName;
                this.operatorFactory = operatorFactory;
@@ -163,11 +158,7 @@ public class StreamNode implements Serializable {
        }
 
        public int getParallelism() {
-               if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-                       return env.getParallelism();
-               } else {
-                       return parallelism;
-               }
+               return parallelism;
        }
 
        public void setParallelism(Integer parallelism) {
@@ -205,8 +196,8 @@ public class StreamNode implements Serializable {
                this.preferredResources = preferredResources;
        }
 
-       public Long getBufferTimeout() {
-               return bufferTimeout != null ? bufferTimeout : 
env.getBufferTimeout();
+       public long getBufferTimeout() {
+               return bufferTimeout;
        }
 
        public void setBufferTimeout(Long bufferTimeout) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 0ae483e..19e2fb6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -85,16 +85,16 @@ public class StreamGraphGeneratorTest {
                        switch (node.getOperatorName()) {
 
                                case "A":
-                                       assertEquals(77L, 
node.getBufferTimeout().longValue());
+                                       assertEquals(77L, 
node.getBufferTimeout());
                                        break;
                                case "B":
-                                       assertEquals(0L, 
node.getBufferTimeout().longValue());
+                                       assertEquals(0L, 
node.getBufferTimeout());
                                        break;
                                case "C":
-                                       assertEquals(12L, 
node.getBufferTimeout().longValue());
+                                       assertEquals(12L, 
node.getBufferTimeout());
                                        break;
                                case "D":
-                                       assertEquals(77L, 
node.getBufferTimeout().longValue());
+                                       assertEquals(77L, 
node.getBufferTimeout());
                                        break;
                                default:
                                        assertTrue(node.getOperator() 
instanceof StreamSource);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 41f4220..e238cad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -794,7 +794,6 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                        StreamEdge outputEdge = new StreamEdge(
                                new StreamNode(
-                                       null,
                                        chainedIndex - 1,
                                        null,
                                        null,
@@ -804,7 +803,6 @@ public class OneInputStreamTaskTest extends TestLogger {
                                        null
                                ),
                                new StreamNode(
-                                       null,
                                        chainedIndex,
                                        null,
                                        null,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 60f450f..06e1b99 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -93,8 +93,8 @@ public class StreamConfigChainer {
 
                tailConfig.setChainedOutputs(Collections.singletonList(
                        new StreamEdge(
-                               new StreamNode(null, 
tailConfig.getChainIndex(), null, null, (StreamOperator<?>) null, null, null, 
null),
-                               new StreamNode(null, chainIndex, null, null, 
(StreamOperator<?>) null, null, null, null),
+                               new StreamNode(tailConfig.getChainIndex(), 
null, null, (StreamOperator<?>) null, null, null, null),
+                               new StreamNode(chainIndex, null, null, 
(StreamOperator<?>) null, null, null, null),
                                0,
                                Collections.<String>emptyList(),
                                null,
@@ -120,8 +120,8 @@ public class StreamConfigChainer {
                List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
                outEdgesInOrder.add(
                        new StreamEdge(
-                               new StreamNode(null, chainIndex, null, null, 
(StreamOperator<?>) null, null, null, null),
-                               new StreamNode(null, chainIndex , null, null, 
(StreamOperator<?>) null, null, null, null),
+                               new StreamNode(chainIndex, null, null, 
(StreamOperator<?>) null, null, null, null),
+                               new StreamNode(chainIndex , null, null, 
(StreamOperator<?>) null, null, null, null),
                                0,
                                Collections.<String>emptyList(),
                                new BroadcastPartitioner<Object>(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index be31923..c312008 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -195,8 +195,8 @@ public class StreamTaskTestHarness<OUT> {
                };
 
                List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
-               StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", 
null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
-               StreamNode targetVertexDummy = new StreamNode(null, 1, "group", 
null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
+               StreamNode sourceVertexDummy = new StreamNode(0, "group", null, 
dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
+               StreamNode targetVertexDummy = new StreamNode(1, "group", null, 
dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
 
                outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, 
targetVertexDummy, 0, new LinkedList<String>(), new 
BroadcastPartitioner<Object>(), null /* output tag */));
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index aa900ed..4f9f716 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -116,8 +116,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> 
extends StreamTaskTest
                        private static final long serialVersionUID = 1L;
                };
 
-               StreamNode sourceVertexDummy = new StreamNode(null, 0, "default 
group", null, dummyOperator, "source dummy", new 
LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-               StreamNode targetVertexDummy = new StreamNode(null, 1, "default 
group", null, dummyOperator, "target dummy", new 
LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+               StreamNode sourceVertexDummy = new StreamNode(0, "default 
group", null, dummyOperator, "source dummy", new 
LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+               StreamNode targetVertexDummy = new StreamNode(1, "default 
group", null, dummyOperator, "target dummy", new 
LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 
                for (int i = 0; i < numInputGates; i++) {
 

Reply via email to