This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 41a9d7e8454510c50d440ddc4e79184152cac5cb
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Fri Nov 1 13:20:14 2019 +0100

    Wired Executors to (Stream)ExecutionEnv
---
 .../execution/DefaultExecutorServiceLoader.java    | 78 ++++++++++++++++++++++
 .../flink/api/java/ExecutionEnvironment.java       | 32 ++++++++-
 .../environment/StreamExecutionEnvironment.java    | 33 ++++++++-
 3 files changed, 141 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
new file mode 100644
index 0000000..f737ff9
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The default implementation of the {@link ExecutorServiceLoader}. This 
implementation uses
+ * Java service discovery to find the available {@link ExecutorFactory 
executor factories}.
+ */
+public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
+
+       // TODO: 01.11.19 this code is almost identical to the 
ClusterClientServiceLoader and its default implementation.
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);
+
+       private static final ServiceLoader<ExecutorFactory> defaultLoader = 
ServiceLoader.load(ExecutorFactory.class);
+
+       @Override
+       public ExecutorFactory getExecutorFactory(final Configuration 
configuration) {
+               checkNotNull(configuration);
+
+               final List<ExecutorFactory> compatibleFactories = new 
ArrayList<>();
+               final Iterator<ExecutorFactory> factories = 
defaultLoader.iterator();
+               while (factories.hasNext()) {
+                       try {
+                               final ExecutorFactory factory = 
factories.next();
+                               if (factory != null && 
factory.isCompatibleWith(configuration)) {
+                                       compatibleFactories.add(factory);
+                               }
+                       } catch (Throwable e) {
+                               if (e.getCause() instanceof 
NoClassDefFoundError) {
+                                       LOG.info("Could not load factory due to 
missing dependencies.");
+                               } else {
+                                       throw e;
+                               }
+                       }
+               }
+
+               if (compatibleFactories.size() > 1) {
+                       final List<String> configStr =
+                                       
configuration.toMap().entrySet().stream()
+                                                       .map(e -> e.getKey() + 
"=" + e.getValue())
+                                                       
.collect(Collectors.toList());
+
+                       throw new IllegalStateException("Multiple compatible 
client factories found for:\n" + String.join("\n", configStr) + ".");
+               }
+
+               return compatibleFactories.isEmpty() ? null : 
compatibleFactories.get(0);
+       }
+}
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index ba64bff..05ef6ee 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -50,7 +50,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -122,11 +127,24 @@ public abstract class ExecutionEnvironment {
        /** Flag to indicate whether sinks have been cleared in previous 
executions. */
        private boolean wasExecuted = false;
 
+       private final ExecutorServiceLoader executorServiceLoader;
+
+       private final Configuration executorConfiguration;
+
        /**
         * Creates a new Execution Environment.
         */
        protected ExecutionEnvironment() {
+               this(new Configuration());
+       }
+
+       protected ExecutionEnvironment(final Configuration 
executorConfiguration) {
+               this(new DefaultExecutorServiceLoader(), executorConfiguration);
+       }
 
+       protected ExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
+               this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
+               this.executorConfiguration = 
checkNotNull(executorConfiguration);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -761,7 +779,19 @@ public abstract class ExecutionEnvironment {
         * @return The result of the job execution, containing elapsed time and 
accumulators.
         * @throws Exception Thrown, if the program executions fails.
         */
-       public abstract JobExecutionResult execute(String jobName) throws 
Exception;
+       public JobExecutionResult execute(String jobName) throws Exception {
+               if (executorConfiguration.get(ExecutionOptions.TARGET) == null) 
{
+                       throw new RuntimeException("No execution.target 
specified in your configuration file.");
+               }
+
+               final Plan plan = createProgramPlan(jobName);
+               final ExecutorFactory executorFactory =
+                               
executorServiceLoader.getExecutorFactory(executorConfiguration);
+
+               final Executor executor = 
executorFactory.getExecutor(executorConfiguration);
+               lastJobExecutionResult = executor.execute(plan, 
executorConfiguration); // TODO: 23.10.19 here I may need to consolidate 
executionConfig
+               return lastJobExecutionResult;
+       }
 
        /**
         * Creates the plan with which the system will execute the program, and 
returns it as
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2485df4..9108d2d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -46,7 +46,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -144,11 +149,27 @@ public abstract class StreamExecutionEnvironment {
 
        protected final List<Tuple2<String, 
DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();
 
+       private final ExecutorServiceLoader executorServiceLoader;
+
+       private final Configuration executorConfiguration;
 
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
 
+       public StreamExecutionEnvironment() {
+               this(new Configuration());
+       }
+
+       public StreamExecutionEnvironment(final Configuration 
executorConfiguration) {
+               this(new DefaultExecutorServiceLoader(), executorConfiguration);
+       }
+
+       public StreamExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
+               this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
+               this.executorConfiguration = 
checkNotNull(executorConfiguration);
+       }
+
        /**
         * Gets the config object.
         */
@@ -1515,7 +1536,17 @@ public abstract class StreamExecutionEnvironment {
         * @throws Exception which occurs during job execution.
         */
        @Internal
-       public abstract JobExecutionResult execute(StreamGraph streamGraph) 
throws Exception;
+       public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
+               if (executorConfiguration.get(ExecutionOptions.TARGET) == null) 
{
+                       throw new RuntimeException("No execution.target 
specified in your configuration file.");
+               }
+
+               final ExecutorFactory executorFactory =
+                               
executorServiceLoader.getExecutorFactory(executorConfiguration);
+
+               final Executor executor = 
executorFactory.getExecutor(executorConfiguration);
+               return executor.execute(streamGraph, executorConfiguration); // 
TODO: 23.10.19 here I may need to consolidate executionConfig
+       }
 
        /**
         * Getter of the {@link 
org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.

Reply via email to