This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b5b09bd2ff988498b3557e45dd194e46ec45b6b Author: Aljoscha Krettek <[email protected]> AuthorDate: Fri Jun 14 17:04:09 2019 +0800 [FLINK-12686][datastream] Remove StreamExecutionEnvironment from StreamNode --- .../flink/streaming/api/graph/StreamGraph.java | 2 +- .../streaming/api/graph/StreamGraphGenerator.java | 20 +++++++++++++---- .../flink/streaming/api/graph/StreamNode.java | 25 +++++++--------------- .../api/graph/StreamGraphGeneratorTest.java | 8 +++---- .../runtime/tasks/OneInputStreamTaskTest.java | 2 -- .../runtime/tasks/StreamConfigChainer.java | 8 +++---- .../runtime/tasks/StreamTaskTestHarness.java | 4 ++-- .../tasks/TwoInputStreamTaskTestHarness.java | 4 ++-- 8 files changed, 37 insertions(+), 36 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 4cc18eb..3f438d9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -260,7 +260,7 @@ public class StreamGraph extends StreamingPlan { throw new RuntimeException("Duplicate vertexID " + vertexID); } - StreamNode vertex = new StreamNode(environment, + StreamNode vertex = new StreamNode( vertexID, slotSharingGroup, coLocationGroup, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 792b719..fc574a0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -197,7 +198,10 @@ public class StreamGraphGenerator { if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); + } else { + streamGraph.setBufferTimeout(transform.getId(), env.getBufferTimeout()); } + if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } @@ -490,7 +494,9 @@ public class StreamGraphGenerator { streamGraph.setInputFormat(source.getId(), ((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat()); } - streamGraph.setParallelism(source.getId(), source.getParallelism()); + int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? + source.getParallelism() : env.getParallelism(); + streamGraph.setParallelism(source.getId(), parallelism); streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism()); return Collections.singleton(source.getId()); } @@ -517,7 +523,9 @@ public class StreamGraphGenerator { streamGraph.setOutputFormat(sink.getId(), ((OutputFormatOperatorFactory) operatorFactory).getOutputFormat()); } - streamGraph.setParallelism(sink.getId(), sink.getParallelism()); + int parallelism = sink.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? + sink.getParallelism() : env.getParallelism(); + streamGraph.setParallelism(sink.getId(), parallelism); streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism()); for (Integer inputId: inputIds) { @@ -565,7 +573,9 @@ public class StreamGraphGenerator { streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } - streamGraph.setParallelism(transform.getId(), transform.getParallelism()); + int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? + transform.getParallelism() : env.getParallelism(); + streamGraph.setParallelism(transform.getId(), parallelism); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); for (Integer inputId: inputIds) { @@ -612,7 +622,9 @@ public class StreamGraphGenerator { streamGraph.setTwoInputStateKey(transform.getId(), transform.getStateKeySelector1(), transform.getStateKeySelector2(), keySerializer); } - streamGraph.setParallelism(transform.getId(), transform.getParallelism()); + int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? + transform.getParallelism() : env.getParallelism(); + streamGraph.setParallelism(transform.getId(), parallelism); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); for (Integer inputId: inputIds1) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 579a605..e3cc498 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.ResourceSpec; @@ -27,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -46,10 +44,8 @@ public class StreamNode implements Serializable { private static final long serialVersionUID = 1L; - private transient StreamExecutionEnvironment env; - private final int id; - private Integer parallelism = null; + private int parallelism; /** * Maximum parallelism for this stream node. The maximum parallelism is the upper limit for * dynamic scaling and the number of key groups used for partitioned state. @@ -57,7 +53,7 @@ public class StreamNode implements Serializable { private int maxParallelism; private ResourceSpec minResources = ResourceSpec.DEFAULT; private ResourceSpec preferredResources = ResourceSpec.DEFAULT; - private Long bufferTimeout = null; + private long bufferTimeout; private final String operatorName; private String slotSharingGroup; private @Nullable String coLocationGroup; @@ -83,7 +79,7 @@ public class StreamNode implements Serializable { private String userHash; @VisibleForTesting - public StreamNode(StreamExecutionEnvironment env, + public StreamNode( Integer id, String slotSharingGroup, @Nullable String coLocationGroup, @@ -91,11 +87,11 @@ public class StreamNode implements Serializable { String operatorName, List<OutputSelector<?>> outputSelector, Class<? extends AbstractInvokable> jobVertexClass) { - this(env, id, slotSharingGroup, coLocationGroup, SimpleOperatorFactory.of(operator), + this(id, slotSharingGroup, coLocationGroup, SimpleOperatorFactory.of(operator), operatorName, outputSelector, jobVertexClass); } - public StreamNode(StreamExecutionEnvironment env, + public StreamNode( Integer id, String slotSharingGroup, @Nullable String coLocationGroup, @@ -104,7 +100,6 @@ public class StreamNode implements Serializable { List<OutputSelector<?>> outputSelector, Class<? extends AbstractInvokable> jobVertexClass) { - this.env = env; this.id = id; this.operatorName = operatorName; this.operatorFactory = operatorFactory; @@ -163,11 +158,7 @@ public class StreamNode implements Serializable { } public int getParallelism() { - if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { - return env.getParallelism(); - } else { - return parallelism; - } + return parallelism; } public void setParallelism(Integer parallelism) { @@ -205,8 +196,8 @@ public class StreamNode implements Serializable { this.preferredResources = preferredResources; } - public Long getBufferTimeout() { - return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout(); + public long getBufferTimeout() { + return bufferTimeout; } public void setBufferTimeout(Long bufferTimeout) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 0ae483e..19e2fb6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -85,16 +85,16 @@ public class StreamGraphGeneratorTest { switch (node.getOperatorName()) { case "A": - assertEquals(77L, node.getBufferTimeout().longValue()); + assertEquals(77L, node.getBufferTimeout()); break; case "B": - assertEquals(0L, node.getBufferTimeout().longValue()); + assertEquals(0L, node.getBufferTimeout()); break; case "C": - assertEquals(12L, node.getBufferTimeout().longValue()); + assertEquals(12L, node.getBufferTimeout()); break; case "D": - assertEquals(77L, node.getBufferTimeout().longValue()); + assertEquals(77L, node.getBufferTimeout()); break; default: assertTrue(node.getOperator() instanceof StreamSource); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 41f4220..e238cad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -794,7 +794,6 @@ public class OneInputStreamTaskTest extends TestLogger { StreamEdge outputEdge = new StreamEdge( new StreamNode( - null, chainedIndex - 1, null, null, @@ -804,7 +803,6 @@ public class OneInputStreamTaskTest extends TestLogger { null ), new StreamNode( - null, chainedIndex, null, null, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index 60f450f..06e1b99 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -93,8 +93,8 @@ public class StreamConfigChainer { tailConfig.setChainedOutputs(Collections.singletonList( new StreamEdge( - new StreamNode(null, tailConfig.getChainIndex(), null, null, (StreamOperator<?>) null, null, null, null), - new StreamNode(null, chainIndex, null, null, (StreamOperator<?>) null, null, null, null), + new StreamNode(tailConfig.getChainIndex(), null, null, (StreamOperator<?>) null, null, null, null), + new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null), 0, Collections.<String>emptyList(), null, @@ -120,8 +120,8 @@ public class StreamConfigChainer { List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>(); outEdgesInOrder.add( new StreamEdge( - new StreamNode(null, chainIndex, null, null, (StreamOperator<?>) null, null, null, null), - new StreamNode(null, chainIndex , null, null, (StreamOperator<?>) null, null, null, null), + new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null), + new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null, null), 0, Collections.<String>emptyList(), new BroadcastPartitioner<Object>(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index be31923..c312008 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -195,8 +195,8 @@ public class StreamTaskTestHarness<OUT> { }; List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>(); - StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); - StreamNode targetVertexDummy = new StreamNode(null, 1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index aa900ed..4f9f716 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -116,8 +116,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest private static final long serialVersionUID = 1L; }; - StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); - StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); + StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); for (int i = 0; i < numInputGates; i++) {
