This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b8704a0cdf5b9c55a4449a90a9fa80e9cb7b7432 Author: Aljoscha Krettek <[email protected]> AuthorDate: Fri Jun 14 17:23:04 2019 +0800 [FLINK-12686][datastream] Remove StreamExecutionEnvironment from StreamGraph --- .../optimizer/plantranslate/JobGraphGenerator.java | 2 +- .../flink/streaming/api/graph/StreamGraph.java | 36 ++++++++++++++++------ .../streaming/api/graph/StreamGraphGenerator.java | 8 +++-- .../api/graph/StreamingJobGraphGenerator.java | 4 +-- .../api/graph/StreamingJobGraphGeneratorTest.java | 2 +- 5 files changed, 36 insertions(+), 16 deletions(-) diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 12c1def..471bc8d 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -282,7 +282,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { } public static void addUserArtifactEntries(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts, JobGraph jobGraph) { - if (!userArtifacts.isEmpty()) { + if (userArtifacts != null && !userArtifacts.isEmpty()) { try { java.nio.file.Path tmpDir = Files.createTempDirectory("flink-distributed-cache-" + jobGraph.getJobID()); for (Tuple2<String, DistributedCache.DistributedCacheEntry> originalEntry : userArtifacts) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 3f438d9..c0fedd8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.ResourceSpec; @@ -33,6 +34,7 @@ import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -66,6 +68,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Class representing the streaming topology. It contains all the information * necessary to build the jobgraph for the execution. @@ -78,12 +82,15 @@ public class StreamGraph extends StreamingPlan { private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME; - private final StreamExecutionEnvironment environment; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; private boolean chaining; + private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts; + + private TimeCharacteristic timeCharacteristic; + private Map<Integer, StreamNode> streamNodes; private Set<Integer> sources; private Set<Integer> sinks; @@ -96,10 +103,9 @@ public class StreamGraph extends StreamingPlan { private StateBackend stateBackend; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; - public StreamGraph(StreamExecutionEnvironment environment) { - this.environment = environment; - this.executionConfig = environment.getConfig(); - this.checkpointConfig = environment.getCheckpointConfig(); + public StreamGraph(ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) { + this.executionConfig = checkNotNull(executionConfig); + this.checkpointConfig = checkNotNull(checkpointConfig); // create an empty new stream graph. clear(); @@ -120,10 +126,6 @@ public class StreamGraph extends StreamingPlan { sinks = new HashSet<>(); } - public StreamExecutionEnvironment getEnvironment() { - return environment; - } - public ExecutionConfig getExecutionConfig() { return executionConfig; } @@ -152,6 +154,22 @@ public class StreamGraph extends StreamingPlan { return this.stateBackend; } + public Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> getUserArtifacts() { + return userArtifacts; + } + + public void setUserArtifacts(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts) { + this.userArtifacts = userArtifacts; + } + + public TimeCharacteristic getTimeCharacteristic() { + return timeCharacteristic; + } + + public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + this.timeCharacteristic = timeCharacteristic; + } + // Checkpointing public boolean isChainingEnabled() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index fc574a0..9a11459 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -105,9 +105,11 @@ public class StreamGraphGenerator { * Private constructor. The generator should only be invoked using {@link #generate}. */ private StreamGraphGenerator(StreamExecutionEnvironment env) { - this.streamGraph = new StreamGraph(env); - this.streamGraph.setChaining(env.isChainingEnabled()); - this.streamGraph.setStateBackend(env.getStateBackend()); + streamGraph = new StreamGraph(env.getConfig(), env.getCheckpointConfig()); + streamGraph.setChaining(env.isChainingEnabled()); + streamGraph.setStateBackend(env.getStateBackend()); + streamGraph.setUserArtifacts(env.getCachedFiles()); + streamGraph.setTimeCharacteristic(env.getStreamTimeCharacteristic()); this.env = env; this.alreadyTransformed = new HashMap<>(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 99a6f2a..19151ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -164,7 +164,7 @@ public class StreamingJobGraphGenerator { configureCheckpointing(); - JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph); + JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph); // set the ExecutionConfig last when it has been finalized try { @@ -464,7 +464,7 @@ public class StreamingJobGraphGenerator { config.setNonChainedOutputs(nonChainableOutputs); config.setChainedOutputs(chainableOutputs); - config.setTimeCharacteristic(streamGraph.getEnvironment().getStreamTimeCharacteristic()); + config.setTimeCharacteristic(streamGraph.getTimeCharacteristic()); final CheckpointConfig checkpointCfg = streamGraph.getCheckpointConfig(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index c909b32..284db19 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -126,7 +126,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { @Test public void testDisabledCheckpointing() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamGraph streamGraph = new StreamGraph(env); + StreamGraph streamGraph = new StreamGraph(env.getConfig(), env.getCheckpointConfig()); assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
