This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors in repository https://gitbox.apache.org/repos/asf/flink.git
commit 41a9d7e8454510c50d440ddc4e79184152cac5cb Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Fri Nov 1 13:20:14 2019 +0100 Wired Executors to (Stream)ExecutionEnv --- .../execution/DefaultExecutorServiceLoader.java | 78 ++++++++++++++++++++++ .../flink/api/java/ExecutionEnvironment.java | 32 ++++++++- .../environment/StreamExecutionEnvironment.java | 33 ++++++++- 3 files changed, 141 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java new file mode 100644 index 0000000..f737ff9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses + * Java service discovery to find the available {@link ExecutorFactory executor factories}. + */ +public class DefaultExecutorServiceLoader implements ExecutorServiceLoader { + + // TODO: 01.11.19 this code is almost identical to the ClusterClientServiceLoader and its default implementation. + + private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceLoader.class); + + private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class); + + @Override + public ExecutorFactory getExecutorFactory(final Configuration configuration) { + checkNotNull(configuration); + + final List<ExecutorFactory> compatibleFactories = new ArrayList<>(); + final Iterator<ExecutorFactory> factories = defaultLoader.iterator(); + while (factories.hasNext()) { + try { + final ExecutorFactory factory = factories.next(); + if (factory != null && factory.isCompatibleWith(configuration)) { + compatibleFactories.add(factory); + } + } catch (Throwable e) { + if (e.getCause() instanceof NoClassDefFoundError) { + LOG.info("Could not load factory due to missing dependencies."); + } else { + throw e; + } + } + } + + if (compatibleFactories.size() > 1) { + final List<String> configStr = + configuration.toMap().entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.toList()); + + throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + "."); + } + + return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index ba64bff..05ef6ee 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -50,7 +50,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.execution.DefaultExecutorServiceLoader; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.ExecutorFactory; +import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; @@ -122,11 +127,24 @@ public abstract class ExecutionEnvironment { /** Flag to indicate whether sinks have been cleared in previous executions. */ private boolean wasExecuted = false; + private final ExecutorServiceLoader executorServiceLoader; + + private final Configuration executorConfiguration; + /** * Creates a new Execution Environment. */ protected ExecutionEnvironment() { + this(new Configuration()); + } + + protected ExecutionEnvironment(final Configuration executorConfiguration) { + this(new DefaultExecutorServiceLoader(), executorConfiguration); + } + protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { + this.executorServiceLoader = checkNotNull(executorServiceLoader); + this.executorConfiguration = checkNotNull(executorConfiguration); } // -------------------------------------------------------------------------------------------- @@ -761,7 +779,19 @@ public abstract class ExecutionEnvironment { * @return The result of the job execution, containing elapsed time and accumulators. * @throws Exception Thrown, if the program executions fails. */ - public abstract JobExecutionResult execute(String jobName) throws Exception; + public JobExecutionResult execute(String jobName) throws Exception { + if (executorConfiguration.get(ExecutionOptions.TARGET) == null) { + throw new RuntimeException("No execution.target specified in your configuration file."); + } + + final Plan plan = createProgramPlan(jobName); + final ExecutorFactory executorFactory = + executorServiceLoader.getExecutorFactory(executorConfiguration); + + final Executor executor = executorFactory.getExecutor(executorConfiguration); + lastJobExecutionResult = executor.execute(plan, executorConfiguration); // TODO: 23.10.19 here I may need to consolidate executionConfig + return lastJobExecutionResult; + } /** * Creates the plan with which the system will execute the program, and returns it as diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 2485df4..9108d2d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -46,7 +46,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.OptimizerPlanEnvironment; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.execution.DefaultExecutorServiceLoader; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.ExecutorFactory; +import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -144,11 +149,27 @@ public abstract class StreamExecutionEnvironment { protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>(); + private final ExecutorServiceLoader executorServiceLoader; + + private final Configuration executorConfiguration; // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- + public StreamExecutionEnvironment() { + this(new Configuration()); + } + + public StreamExecutionEnvironment(final Configuration executorConfiguration) { + this(new DefaultExecutorServiceLoader(), executorConfiguration); + } + + public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { + this.executorServiceLoader = checkNotNull(executorServiceLoader); + this.executorConfiguration = checkNotNull(executorConfiguration); + } + /** * Gets the config object. */ @@ -1515,7 +1536,17 @@ public abstract class StreamExecutionEnvironment { * @throws Exception which occurs during job execution. */ @Internal - public abstract JobExecutionResult execute(StreamGraph streamGraph) throws Exception; + public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { + if (executorConfiguration.get(ExecutionOptions.TARGET) == null) { + throw new RuntimeException("No execution.target specified in your configuration file."); + } + + final ExecutorFactory executorFactory = + executorServiceLoader.getExecutorFactory(executorConfiguration); + + final Executor executor = executorFactory.getExecutor(executorConfiguration); + return executor.execute(streamGraph, executorConfiguration); // TODO: 23.10.19 here I may need to consolidate executionConfig + } /** * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.