This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8704a0cdf5b9c55a4449a90a9fa80e9cb7b7432
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Fri Jun 14 17:23:04 2019 +0800

    [FLINK-12686][datastream] Remove StreamExecutionEnvironment from StreamGraph
---
 .../optimizer/plantranslate/JobGraphGenerator.java |  2 +-
 .../flink/streaming/api/graph/StreamGraph.java     | 36 ++++++++++++++++------
 .../streaming/api/graph/StreamGraphGenerator.java  |  8 +++--
 .../api/graph/StreamingJobGraphGenerator.java      |  4 +--
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  2 +-
 5 files changed, 36 insertions(+), 16 deletions(-)

diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 12c1def..471bc8d 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -282,7 +282,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
        }
 
        public static void addUserArtifactEntries(Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts, JobGraph jobGraph) {
-               if (!userArtifacts.isEmpty()) {
+               if (userArtifacts != null && !userArtifacts.isEmpty()) {
                        try {
                                java.nio.file.Path tmpDir = 
Files.createTempDirectory("flink-distributed-cache-" + jobGraph.getJobID());
                                for (Tuple2<String, 
DistributedCache.DistributedCacheEntry> originalEntry : userArtifacts) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 3f438d9..c0fedd8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -33,6 +34,7 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -66,6 +68,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Class representing the streaming topology. It contains all the information
  * necessary to build the jobgraph for the execution.
@@ -78,12 +82,15 @@ public class StreamGraph extends StreamingPlan {
 
        private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
 
-       private final StreamExecutionEnvironment environment;
        private final ExecutionConfig executionConfig;
        private final CheckpointConfig checkpointConfig;
 
        private boolean chaining;
 
+       private Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts;
+
+       private TimeCharacteristic timeCharacteristic;
+
        private Map<Integer, StreamNode> streamNodes;
        private Set<Integer> sources;
        private Set<Integer> sinks;
@@ -96,10 +103,9 @@ public class StreamGraph extends StreamingPlan {
        private StateBackend stateBackend;
        private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
-       public StreamGraph(StreamExecutionEnvironment environment) {
-               this.environment = environment;
-               this.executionConfig = environment.getConfig();
-               this.checkpointConfig = environment.getCheckpointConfig();
+       public StreamGraph(ExecutionConfig executionConfig, CheckpointConfig 
checkpointConfig) {
+               this.executionConfig = checkNotNull(executionConfig);
+               this.checkpointConfig = checkNotNull(checkpointConfig);
 
                // create an empty new stream graph.
                clear();
@@ -120,10 +126,6 @@ public class StreamGraph extends StreamingPlan {
                sinks = new HashSet<>();
        }
 
-       public StreamExecutionEnvironment getEnvironment() {
-               return environment;
-       }
-
        public ExecutionConfig getExecutionConfig() {
                return executionConfig;
        }
@@ -152,6 +154,22 @@ public class StreamGraph extends StreamingPlan {
                return this.stateBackend;
        }
 
+       public Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> getUserArtifacts() {
+               return userArtifacts;
+       }
+
+       public void setUserArtifacts(Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts) {
+               this.userArtifacts = userArtifacts;
+       }
+
+       public TimeCharacteristic getTimeCharacteristic() {
+               return timeCharacteristic;
+       }
+
+       public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
+               this.timeCharacteristic = timeCharacteristic;
+       }
+
        // Checkpointing
 
        public boolean isChainingEnabled() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index fc574a0..9a11459 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -105,9 +105,11 @@ public class StreamGraphGenerator {
         * Private constructor. The generator should only be invoked using 
{@link #generate}.
         */
        private StreamGraphGenerator(StreamExecutionEnvironment env) {
-               this.streamGraph = new StreamGraph(env);
-               this.streamGraph.setChaining(env.isChainingEnabled());
-               this.streamGraph.setStateBackend(env.getStateBackend());
+               streamGraph = new StreamGraph(env.getConfig(), 
env.getCheckpointConfig());
+               streamGraph.setChaining(env.isChainingEnabled());
+               streamGraph.setStateBackend(env.getStateBackend());
+               streamGraph.setUserArtifacts(env.getCachedFiles());
+               
streamGraph.setTimeCharacteristic(env.getStreamTimeCharacteristic());
                this.env = env;
                this.alreadyTransformed = new HashMap<>();
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 99a6f2a..19151ce 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -164,7 +164,7 @@ public class StreamingJobGraphGenerator {
 
                configureCheckpointing();
 
-               
JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(),
 jobGraph);
+               
JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), 
jobGraph);
 
                // set the ExecutionConfig last when it has been finalized
                try {
@@ -464,7 +464,7 @@ public class StreamingJobGraphGenerator {
                config.setNonChainedOutputs(nonChainableOutputs);
                config.setChainedOutputs(chainableOutputs);
 
-               
config.setTimeCharacteristic(streamGraph.getEnvironment().getStreamTimeCharacteristic());
+               
config.setTimeCharacteristic(streamGraph.getTimeCharacteristic());
 
                final CheckpointConfig checkpointCfg = 
streamGraph.getCheckpointConfig();
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index c909b32..284db19 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -126,7 +126,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
        @Test
        public void testDisabledCheckpointing() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamGraph streamGraph = new StreamGraph(env);
+               StreamGraph streamGraph = new StreamGraph(env.getConfig(), 
env.getCheckpointConfig());
                assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
                JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);

Reply via email to