This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f95db6bbaa1cabf86f624340068b41e7b4018862
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Fri Jun 14 19:06:07 2019 +0800

    [FLINK-12686][datastream] Remove StreamExecutionEnvironment from 
StreamGraphGenerator
---
 .../environment/StreamExecutionEnvironment.java    |  11 +-
 .../streaming/api/graph/StreamGraphGenerator.java  | 138 ++++++++++++++-------
 2 files changed, 103 insertions(+), 46 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f8330f4..6c334a6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1511,10 +1511,19 @@ public abstract class StreamExecutionEnvironment {
         */
        @Internal
        public StreamGraph getStreamGraph() {
+               return getStreamGraphGenerator().generate();
+       }
+
+       private StreamGraphGenerator getStreamGraphGenerator() {
                if (transformations.size() <= 0) {
                        throw new IllegalStateException("No operators defined 
in streaming topology. Cannot execute.");
                }
-               return StreamGraphGenerator.generate(this, transformations);
+               return new StreamGraphGenerator(transformations, config, 
checkpointCfg)
+                       .setStateBackend(defaultStateBackend)
+                       .setChaining(isChainingEnabled)
+                       .setUserArtifacts(cacheFile)
+                       .setTimeCharacteristic(timeCharacteristic)
+                       .setDefaultBufferTimeout(bufferTimeout);
        }
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 9a11459..a227da3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -20,10 +20,13 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.InputFormatOperatorFactory;
 import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -50,6 +53,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A generator that generates a {@link StreamGraph} from a graph of
  * {@link StreamTransformation StreamTransformations}.
@@ -84,10 +89,30 @@ public class StreamGraphGenerator {
 
        public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
 
-       // The StreamGraph that is being built, this is initialized at the 
beginning.
-       private final StreamGraph streamGraph;
+       public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = 
TimeCharacteristic.ProcessingTime;
+
+       public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
+
+       /** The default buffer timeout (max delay of records in the network 
stack). */
+       public static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
+
+       private final List<StreamTransformation<?>> transformations;
+
+       private final ExecutionConfig executionConfig;
+
+       private final CheckpointConfig checkpointConfig;
+
+       private StateBackend stateBackend;
+
+       private boolean chaining = true;
+
+       private Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts;
 
-       private final StreamExecutionEnvironment env;
+       private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
+
+       private long defaultBufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
+
+       private String jobName = DEFAULT_JOB_NAME;
 
        // This is used to assign a unique ID to iteration source/sink
        protected static Integer iterationIdCounter = 0;
@@ -96,46 +121,69 @@ public class StreamGraphGenerator {
                return iterationIdCounter;
        }
 
+       private StreamGraph streamGraph;
+
        // Keep track of which Transforms we have already transformed, this is 
necessary because
        // we have loops, i.e. feedback edges.
        private Map<StreamTransformation<?>, Collection<Integer>> 
alreadyTransformed;
 
+       public StreamGraphGenerator(List<StreamTransformation<?>> 
transformations, ExecutionConfig executionConfig, CheckpointConfig 
checkpointConfig) {
+               this.transformations = checkNotNull(transformations);
+               this.executionConfig = checkNotNull(executionConfig);
+               this.checkpointConfig = checkNotNull(checkpointConfig);
+       }
 
-       /**
-        * Private constructor. The generator should only be invoked using 
{@link #generate}.
-        */
-       private StreamGraphGenerator(StreamExecutionEnvironment env) {
-               streamGraph = new StreamGraph(env.getConfig(), 
env.getCheckpointConfig());
-               streamGraph.setChaining(env.isChainingEnabled());
-               streamGraph.setStateBackend(env.getStateBackend());
-               streamGraph.setUserArtifacts(env.getCachedFiles());
-               
streamGraph.setTimeCharacteristic(env.getStreamTimeCharacteristic());
-               this.env = env;
-               this.alreadyTransformed = new HashMap<>();
+       public StreamGraphGenerator setStateBackend(StateBackend stateBackend) {
+               this.stateBackend = stateBackend;
+               return this;
        }
 
-       /**
-        * Generates a {@code StreamGraph} by traversing the graph of {@code 
StreamTransformations}
-        * starting from the given transformations.
-        *
-        * @param env The {@code StreamExecutionEnvironment} that is used to 
set some parameters of the
-        *            job
-        * @param transformations The transformations starting from which to 
transform the graph
-        *
-        * @return The generated {@code StreamGraph}
-        */
-       public static StreamGraph generate(StreamExecutionEnvironment env, 
List<StreamTransformation<?>> transformations) {
-               return new 
StreamGraphGenerator(env).generateInternal(transformations);
+       public StreamGraphGenerator setChaining(boolean chaining) {
+               this.chaining = chaining;
+               return this;
        }
 
-       /**
-        * This starts the actual transformation, beginning from the sinks.
-        */
-       private StreamGraph generateInternal(List<StreamTransformation<?>> 
transformations) {
+       public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> userArtifacts) {
+               this.userArtifacts = userArtifacts;
+               return this;
+       }
+
+       public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
+               this.timeCharacteristic = timeCharacteristic;
+               return this;
+       }
+
+       public StreamGraphGenerator setDefaultBufferTimeout(long 
defaultBufferTimeout) {
+               this.defaultBufferTimeout = defaultBufferTimeout;
+               return this;
+       }
+
+       public StreamGraphGenerator setJobName(String jobName) {
+               this.jobName = jobName;
+               return this;
+       }
+
+       public StreamGraph generate() {
+               streamGraph = new StreamGraph(executionConfig, 
checkpointConfig);
+               streamGraph.setStateBackend(stateBackend);
+               streamGraph.setChaining(chaining);
+               streamGraph.setUserArtifacts(userArtifacts);
+               streamGraph.setTimeCharacteristic(timeCharacteristic);
+               streamGraph.setJobName(jobName);
+
+               alreadyTransformed = new HashMap<>();
+
                for (StreamTransformation<?> transformation: transformations) {
                        transform(transformation);
                }
-               return streamGraph;
+
+               final StreamGraph builtStreamGraph = streamGraph;
+
+               alreadyTransformed.clear();
+               alreadyTransformed = null;
+               streamGraph = null;
+
+               return builtStreamGraph;
        }
 
        /**
@@ -156,7 +204,7 @@ public class StreamGraphGenerator {
 
                        // if the max parallelism hasn't been set, then first 
use the job wide max parallelism
                        // from the ExecutionConfig.
-                       int globalMaxParallelismFromConfig = 
env.getConfig().getMaxParallelism();
+                       int globalMaxParallelismFromConfig = 
executionConfig.getMaxParallelism();
                        if (globalMaxParallelismFromConfig > 0) {
                                
transform.setMaxParallelism(globalMaxParallelismFromConfig);
                        }
@@ -201,7 +249,7 @@ public class StreamGraphGenerator {
                if (transform.getBufferTimeout() >= 0) {
                        streamGraph.setBufferTimeout(transform.getId(), 
transform.getBufferTimeout());
                } else {
-                       streamGraph.setBufferTimeout(transform.getId(), 
env.getBufferTimeout());
+                       streamGraph.setBufferTimeout(transform.getId(), 
defaultBufferTimeout);
                }
 
                if (transform.getUid() != null) {
@@ -383,8 +431,8 @@ public class StreamGraphGenerator {
                StreamNode itSink = itSourceAndSink.f1;
 
                // We set the proper serializers for the sink/source
-               streamGraph.setSerializers(itSource.getId(), null, null, 
iterate.getOutputType().createSerializer(env.getConfig()));
-               streamGraph.setSerializers(itSink.getId(), 
iterate.getOutputType().createSerializer(env.getConfig()), null, null);
+               streamGraph.setSerializers(itSource.getId(), null, null, 
iterate.getOutputType().createSerializer(executionConfig));
+               streamGraph.setSerializers(itSink.getId(), 
iterate.getOutputType().createSerializer(executionConfig), null, null);
 
                // also add the feedback source ID to the result IDs, so that 
downstream operators will
                // add both as input
@@ -448,8 +496,8 @@ public class StreamGraphGenerator {
                StreamNode itSink = itSourceAndSink.f1;
 
                // We set the proper serializers for the sink/source
-               streamGraph.setSerializers(itSource.getId(), null, null, 
coIterate.getOutputType().createSerializer(env.getConfig()));
-               streamGraph.setSerializers(itSink.getId(), 
coIterate.getOutputType().createSerializer(env.getConfig()), null, null);
+               streamGraph.setSerializers(itSource.getId(), null, null, 
coIterate.getOutputType().createSerializer(executionConfig));
+               streamGraph.setSerializers(itSink.getId(), 
coIterate.getOutputType().createSerializer(executionConfig), null, null);
 
                Collection<Integer> resultIds = 
Collections.singleton(itSource.getId());
 
@@ -497,7 +545,7 @@ public class StreamGraphGenerator {
                                        ((InputFormatOperatorFactory<T>) 
source.getOperatorFactory()).getInputFormat());
                }
                int parallelism = source.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
-                       source.getParallelism() : env.getParallelism();
+                       source.getParallelism() : 
executionConfig.getParallelism();
                streamGraph.setParallelism(source.getId(), parallelism);
                streamGraph.setMaxParallelism(source.getId(), 
source.getMaxParallelism());
                return Collections.singleton(source.getId());
@@ -526,7 +574,7 @@ public class StreamGraphGenerator {
                }
 
                int parallelism = sink.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
-                       sink.getParallelism() : env.getParallelism();
+                       sink.getParallelism() : 
executionConfig.getParallelism();
                streamGraph.setParallelism(sink.getId(), parallelism);
                streamGraph.setMaxParallelism(sink.getId(), 
sink.getMaxParallelism());
 
@@ -538,7 +586,7 @@ public class StreamGraphGenerator {
                }
 
                if (sink.getStateKeySelector() != null) {
-                       TypeSerializer<?> keySerializer = 
sink.getStateKeyType().createSerializer(env.getConfig());
+                       TypeSerializer<?> keySerializer = 
sink.getStateKeyType().createSerializer(executionConfig);
                        streamGraph.setOneInputStateKey(sink.getId(), 
sink.getStateKeySelector(), keySerializer);
                }
 
@@ -571,12 +619,12 @@ public class StreamGraphGenerator {
                                transform.getName());
 
                if (transform.getStateKeySelector() != null) {
-                       TypeSerializer<?> keySerializer = 
transform.getStateKeyType().createSerializer(env.getConfig());
+                       TypeSerializer<?> keySerializer = 
transform.getStateKeyType().createSerializer(executionConfig);
                        streamGraph.setOneInputStateKey(transform.getId(), 
transform.getStateKeySelector(), keySerializer);
                }
 
                int parallelism = transform.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
-                       transform.getParallelism() : env.getParallelism();
+                       transform.getParallelism() : 
executionConfig.getParallelism();
                streamGraph.setParallelism(transform.getId(), parallelism);
                streamGraph.setMaxParallelism(transform.getId(), 
transform.getMaxParallelism());
 
@@ -620,12 +668,12 @@ public class StreamGraphGenerator {
                                transform.getName());
 
                if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
-                       TypeSerializer<?> keySerializer = 
transform.getStateKeyType().createSerializer(env.getConfig());
+                       TypeSerializer<?> keySerializer = 
transform.getStateKeyType().createSerializer(executionConfig);
                        streamGraph.setTwoInputStateKey(transform.getId(), 
transform.getStateKeySelector1(), transform.getStateKeySelector2(), 
keySerializer);
                }
 
                int parallelism = transform.getParallelism() != 
ExecutionConfig.PARALLELISM_DEFAULT ?
-                       transform.getParallelism() : env.getParallelism();
+                       transform.getParallelism() : 
executionConfig.getParallelism();
                streamGraph.setParallelism(transform.getId(), parallelism);
                streamGraph.setMaxParallelism(transform.getId(), 
transform.getMaxParallelism());
 

Reply via email to