This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f95db6bbaa1cabf86f624340068b41e7b4018862 Author: Aljoscha Krettek <[email protected]> AuthorDate: Fri Jun 14 19:06:07 2019 +0800 [FLINK-12686][datastream] Remove StreamExecutionEnvironment from StreamGraphGenerator --- .../environment/StreamExecutionEnvironment.java | 11 +- .../streaming/api/graph/StreamGraphGenerator.java | 138 ++++++++++++++------- 2 files changed, 103 insertions(+), 46 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f8330f4..6c334a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1511,10 +1511,19 @@ public abstract class StreamExecutionEnvironment { */ @Internal public StreamGraph getStreamGraph() { + return getStreamGraphGenerator().generate(); + } + + private StreamGraphGenerator getStreamGraphGenerator() { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } - return StreamGraphGenerator.generate(this, transformations); + return new StreamGraphGenerator(transformations, config, checkpointCfg) + .setStateBackend(defaultStateBackend) + .setChaining(isChainingEnabled) + .setUserArtifacts(cacheFile) + .setTimeCharacteristic(timeCharacteristic) + .setDefaultBufferTimeout(bufferTimeout); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 9a11459..a227da3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -20,10 +20,13 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.operators.InputFormatOperatorFactory; import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -50,6 +53,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A generator that generates a {@link StreamGraph} from a graph of * {@link StreamTransformation StreamTransformations}. @@ -84,10 +89,30 @@ public class StreamGraphGenerator { public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; - // The StreamGraph that is being built, this is initialized at the beginning. - private final StreamGraph streamGraph; + public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; + + public static final String DEFAULT_JOB_NAME = "Flink Streaming Job"; + + /** The default buffer timeout (max delay of records in the network stack). */ + public static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L; + + private final List<StreamTransformation<?>> transformations; + + private final ExecutionConfig executionConfig; + + private final CheckpointConfig checkpointConfig; + + private StateBackend stateBackend; + + private boolean chaining = true; + + private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts; - private final StreamExecutionEnvironment env; + private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; + + private long defaultBufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT; + + private String jobName = DEFAULT_JOB_NAME; // This is used to assign a unique ID to iteration source/sink protected static Integer iterationIdCounter = 0; @@ -96,46 +121,69 @@ public class StreamGraphGenerator { return iterationIdCounter; } + private StreamGraph streamGraph; + // Keep track of which Transforms we have already transformed, this is necessary because // we have loops, i.e. feedback edges. private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed; + public StreamGraphGenerator(List<StreamTransformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) { + this.transformations = checkNotNull(transformations); + this.executionConfig = checkNotNull(executionConfig); + this.checkpointConfig = checkNotNull(checkpointConfig); + } - /** - * Private constructor. The generator should only be invoked using {@link #generate}. - */ - private StreamGraphGenerator(StreamExecutionEnvironment env) { - streamGraph = new StreamGraph(env.getConfig(), env.getCheckpointConfig()); - streamGraph.setChaining(env.isChainingEnabled()); - streamGraph.setStateBackend(env.getStateBackend()); - streamGraph.setUserArtifacts(env.getCachedFiles()); - streamGraph.setTimeCharacteristic(env.getStreamTimeCharacteristic()); - this.env = env; - this.alreadyTransformed = new HashMap<>(); + public StreamGraphGenerator setStateBackend(StateBackend stateBackend) { + this.stateBackend = stateBackend; + return this; } - /** - * Generates a {@code StreamGraph} by traversing the graph of {@code StreamTransformations} - * starting from the given transformations. - * - * @param env The {@code StreamExecutionEnvironment} that is used to set some parameters of the - * job - * @param transformations The transformations starting from which to transform the graph - * - * @return The generated {@code StreamGraph} - */ - public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) { - return new StreamGraphGenerator(env).generateInternal(transformations); + public StreamGraphGenerator setChaining(boolean chaining) { + this.chaining = chaining; + return this; } - /** - * This starts the actual transformation, beginning from the sinks. - */ - private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) { + public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts) { + this.userArtifacts = userArtifacts; + return this; + } + + public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + this.timeCharacteristic = timeCharacteristic; + return this; + } + + public StreamGraphGenerator setDefaultBufferTimeout(long defaultBufferTimeout) { + this.defaultBufferTimeout = defaultBufferTimeout; + return this; + } + + public StreamGraphGenerator setJobName(String jobName) { + this.jobName = jobName; + return this; + } + + public StreamGraph generate() { + streamGraph = new StreamGraph(executionConfig, checkpointConfig); + streamGraph.setStateBackend(stateBackend); + streamGraph.setChaining(chaining); + streamGraph.setUserArtifacts(userArtifacts); + streamGraph.setTimeCharacteristic(timeCharacteristic); + streamGraph.setJobName(jobName); + + alreadyTransformed = new HashMap<>(); + for (StreamTransformation<?> transformation: transformations) { transform(transformation); } - return streamGraph; + + final StreamGraph builtStreamGraph = streamGraph; + + alreadyTransformed.clear(); + alreadyTransformed = null; + streamGraph = null; + + return builtStreamGraph; } /** @@ -156,7 +204,7 @@ public class StreamGraphGenerator { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from the ExecutionConfig. - int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); + int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } @@ -201,7 +249,7 @@ public class StreamGraphGenerator { if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } else { - streamGraph.setBufferTimeout(transform.getId(), env.getBufferTimeout()); + streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout); } if (transform.getUid() != null) { @@ -383,8 +431,8 @@ public class StreamGraphGenerator { StreamNode itSink = itSourceAndSink.f1; // We set the proper serializers for the sink/source - streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(env.getConfig())); - streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(env.getConfig()), null, null); + streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(executionConfig)); + streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(executionConfig), null, null); // also add the feedback source ID to the result IDs, so that downstream operators will // add both as input @@ -448,8 +496,8 @@ public class StreamGraphGenerator { StreamNode itSink = itSourceAndSink.f1; // We set the proper serializers for the sink/source - streamGraph.setSerializers(itSource.getId(), null, null, coIterate.getOutputType().createSerializer(env.getConfig())); - streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(env.getConfig()), null, null); + streamGraph.setSerializers(itSource.getId(), null, null, coIterate.getOutputType().createSerializer(executionConfig)); + streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(executionConfig), null, null); Collection<Integer> resultIds = Collections.singleton(itSource.getId()); @@ -497,7 +545,7 @@ public class StreamGraphGenerator { ((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat()); } int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? - source.getParallelism() : env.getParallelism(); + source.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(source.getId(), parallelism); streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism()); return Collections.singleton(source.getId()); @@ -526,7 +574,7 @@ public class StreamGraphGenerator { } int parallelism = sink.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? - sink.getParallelism() : env.getParallelism(); + sink.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(sink.getId(), parallelism); streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism()); @@ -538,7 +586,7 @@ public class StreamGraphGenerator { } if (sink.getStateKeySelector() != null) { - TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(env.getConfig()); + TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(executionConfig); streamGraph.setOneInputStateKey(sink.getId(), sink.getStateKeySelector(), keySerializer); } @@ -571,12 +619,12 @@ public class StreamGraphGenerator { transform.getName()); if (transform.getStateKeySelector() != null) { - TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); + TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? - transform.getParallelism() : env.getParallelism(); + transform.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(transform.getId(), parallelism); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); @@ -620,12 +668,12 @@ public class StreamGraphGenerator { transform.getName()); if (transform.getStateKeySelector1() != null || transform.getStateKeySelector2() != null) { - TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); + TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig); streamGraph.setTwoInputStateKey(transform.getId(), transform.getStateKeySelector1(), transform.getStateKeySelector2(), keySerializer); } int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? - transform.getParallelism() : env.getParallelism(); + transform.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(transform.getId(), parallelism); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
