[FLINK-5141] [streaming api] Implement LocalStreamEnvironment for new mini cluster.
This closes #2877 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0086b57 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0086b57 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0086b57 Branch: refs/heads/flip-6 Commit: c0086b57eec63bab627383205eed2ff8636c5394 Parents: 4afcc4a Author: biao.liub <[email protected]> Authored: Wed Nov 23 17:02:11 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:43 2016 +0100 ---------------------------------------------------------------------- .../Flip6LocalStreamEnvironment.java | 128 +++++++++++++++++++ 1 file changed, 128 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c0086b57/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java new file mode 100644 index 0000000..a0c128e --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.environment; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Flip6LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally, + * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded + * Flink cluster in the background and executes the program on that cluster. + * + * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default + * parallelism can be set via {@link #setParallelism(int)}. + */ +@Public +public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class); + + /** The configuration to use for the mini cluster */ + private final Configuration conf; + + /** + * Creates a new mini cluster stream environment that uses the default configuration. + */ + public Flip6LocalStreamEnvironment() { + this(null); + } + + /** + * Creates a new mini cluster stream environment that configures its local executor with the given configuration. + * + * @param config The configuration used to configure the local executor. + */ + public Flip6LocalStreamEnvironment(Configuration config) { + if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { + throw new InvalidProgramException( + "The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " + + "or running in a TestEnvironment context."); + } + + this.conf = config == null ? new Configuration() : config; + } + + /** + * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user + * specified name. + * + * @param jobName + * name of the job + * @return The result of the job execution, containing elapsed time and accumulators. + */ + @Override + public JobExecutionResult execute(String jobName) throws Exception { + // transform the streaming program into a JobGraph + StreamGraph streamGraph = getStreamGraph(); + streamGraph.setJobName(jobName); + + JobGraph jobGraph = streamGraph.getJobGraph(); + + jobGraph.setAllowQueuedScheduling(true); + + // As jira FLINK-5140 described, + // we have to set restart strategy to handle NoResourceAvailableException. + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy( + RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); + jobGraph.setExecutionConfig(executionConfig); + + Configuration configuration = new Configuration(); + configuration.addAll(jobGraph.getJobConfiguration()); + configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + + // add (and override) the settings with what the user defined + configuration.addAll(this.conf); + + MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration); + + // Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up. + int slotsCount = 0; + for (JobVertex jobVertex : jobGraph.getVertices()) { + slotsCount += jobVertex.getParallelism(); + } + cfg.setNumTaskManagerSlots(slotsCount); + + if (LOG.isInfoEnabled()) { + LOG.info("Running job on local embedded Flink mini cluster"); + } + + MiniCluster miniCluster = new MiniCluster(cfg); + try { + miniCluster.start(); + return miniCluster.runJobBlocking(jobGraph); + } finally { + transformations.clear(); + miniCluster.shutdown(); + } + } +}
