This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc182735564bf6a6a0fdd687ec260783ceeda8e2
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 18 17:10:15 2019 +0100

    [FLINK-XXXXX] Add the Yarn/Standalone Executors
---
 .../deployment/AbstractJobClusterExecutor.java     | 68 ++++++++++++++++-
 .../deployment/AbstractSessionClusterExecutor.java | 65 ++++++++++++++++-
 .../flink/client/deployment/ExecutorUtils.java     | 55 ++++++++++++++
 .../flink/client/deployment/JobClientImpl.java     | 85 +++++++++++++++++++++-
 .../StandaloneSessionClusterExecutor.java          | 38 ++++++++++
 .../StandaloneSessionClusterExecutorFactory.java   | 45 ++++++++++++
 ...org.apache.flink.core.execution.ExecutorFactory | 16 ++++
 .../yarn/executors/YarnJobClusterExecutor.java     | 41 +++++++++++
 .../executors/YarnJobClusterExecutorFactory.java   | 45 ++++++++++++
 .../yarn/executors/YarnSessionClusterExecutor.java | 39 ++++++++++
 .../YarnSessionClusterExecutorFactory.java         | 45 ++++++++++++
 ...org.apache.flink.core.execution.ExecutorFactory | 17 +++++
 12 files changed, 556 insertions(+), 3 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
index 14a93bc..bca403a 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -1,4 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class AbstractJobClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on 
dedicated (per-job) clusters.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to 
create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>> implements Executor {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
+
+       private final ClientFactory clusterClientFactory;
+
+       public AbstractJobClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+               this.clusterClientFactory = checkNotNull(clusterClientFactory);
+       }
+
+       @Override
+       public CompletableFuture<JobClient> execute(@Nonnull final Pipeline 
pipeline, @Nonnull final Configuration configuration) throws Exception {
+               final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, 
configuration);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
+                       final ExecutionConfigAccessor configAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+
+                       final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(configuration);
+
+                       final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode());
+                       LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
+                       return CompletableFuture.completedFuture(new 
JobClientImpl<>(clusterClient, jobGraph.getJobID()));
+               }
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
index ab8cc01..413ff58 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -1,4 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class AbstractSessionClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on 
an existing (session) cluster.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to 
create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>> implements Executor {
+
+       private final ClientFactory clusterClientFactory;
+
+       public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+               this.clusterClientFactory = checkNotNull(clusterClientFactory);
+       }
+
+       @Override
+       public CompletableFuture<JobClient> execute(@Nonnull final Pipeline 
pipeline, @Nonnull final Configuration configuration) throws Exception {
+               final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, 
configuration);
+
+               try (final ClusterDescriptor<ClusterID> clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(configuration)) {
+                       final ClusterID clusterID = 
clusterClientFactory.getClusterId(configuration);
+                       checkState(clusterID != null);
+
+                       final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID);
+                       return clusterClient
+                                       .submitJob(jobGraph)
+                                       
.thenApply(JobSubmissionResult::getJobID)
+                                       .thenApply(jobID -> new 
JobClientImpl<>(clusterClient, jobID));
+               }
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
index e134206..4541b1c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -1,4 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class with method related to job execution.
+ */
 public class ExecutorUtils {
+
+       /**
+        * Creates the {@link JobGraph} corresponding to the provided {@link 
Pipeline}.
+        *
+        * @param pipeline the pipeline whose job graph we are computing
+        * @param configuration the configuration with the necessary 
information such as jars and
+        *                         classpaths to be included, the parallelism 
of the job and potential
+        *                         savepoint settings used to bootstrap its 
state.
+        * @return the corresponding {@link JobGraph}.
+        */
+       public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, 
@Nonnull final Configuration configuration) {
+               checkNotNull(pipeline);
+               checkNotNull(configuration);
+
+               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+               final JobGraph jobGraph = FlinkPipelineTranslationUtil
+                               .getJobGraph(pipeline, configuration, 
executionConfigAccessor.getParallelism());
+
+               jobGraph.addJars(executionConfigAccessor.getJars());
+               jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
+               
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+               return jobGraph;
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
index e811fc8..29d8008 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -1,4 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class JobClientImpl {
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link JobClient} interface.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+       private final ClusterClient<ClusterID> clusterClient;
+
+       private final JobID jobID;
+
+       public JobClientImpl(final ClusterClient<ClusterID> clusterClient, 
final JobID jobID) {
+               this.jobID = checkNotNull(jobID);
+               this.clusterClient = checkNotNull(clusterClient);
+       }
+
+       @Override
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       @Override
+       public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+               return CompletableFuture.completedFuture(new 
DetachedJobExecutionResult(jobID));
+       }
+
+       @Override
+       public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+               final CompletableFuture<JobExecutionResult> res = new 
CompletableFuture<>();
+
+               final CompletableFuture<JobResult> jobResultFuture = 
clusterClient.requestJobResult(jobID);
+               jobResultFuture.whenComplete(((jobResult, throwable) -> {
+                       if (throwable != null) {
+                               ExceptionUtils.checkInterrupted(throwable);
+                               res.completeExceptionally(new 
ProgramInvocationException("Could not run job", jobID, throwable));
+                       } else {
+                               try {
+                                       final JobExecutionResult result = 
jobResult.toJobExecutionResult(userClassloader);
+                                       res.complete(result);
+                               } catch (JobExecutionException | IOException | 
ClassNotFoundException e) {
+                                       res.completeExceptionally(new 
ProgramInvocationException("Job failed", jobID, e));
+                               }
+                       }
+               }));
+               return res;
+       }
+
+       @Override
+       public void close() throws Exception {
+               this.clusterClient.close();
+       }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
new file mode 100644
index 0000000..f097323
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.core.execution.Executor;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running 
cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutor extends 
AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
+
+       public static final String NAME = "standalone-session-cluster";
+
+       public StandaloneSessionClusterExecutor() {
+               super(new StandaloneClientFactory());
+       }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..43c116e
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) 
cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutorFactory implements 
ExecutorFactory {
+
+       @Override
+       public boolean isCompatibleWith(@Nonnull final Configuration 
configuration) {
+               return configuration.get(DeploymentOptions.TARGET)
+                               
.equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
+       }
+
+       @Override
+       public Executor getExecutor(@Nonnull final Configuration configuration) 
{
+               return new StandaloneSessionClusterExecutor();
+       }
+}
diff --git 
a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
 
b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d9b144f
--- /dev/null
+++ 
b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
new file mode 100644
index 0000000..084b020
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractJobClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an 
error.
+ */
+@Internal
+public class YarnJobClusterExecutor extends 
AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+       public static final String NAME = "yarn-job-cluster";
+
+       public YarnJobClusterExecutor() {
+               super(new YarnClusterClientFactory());
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
new file mode 100644
index 0000000..9dc6fd1
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) 
clusters.
+ */
+@Internal
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
+
+       @Override
+       public boolean isCompatibleWith(@Nonnull final Configuration 
configuration) {
+               return configuration.get(DeploymentOptions.TARGET)
+                               .equalsIgnoreCase(YarnJobClusterExecutor.NAME);
+       }
+
+       @Override
+       public Executor getExecutor(@Nonnull final Configuration configuration) 
{
+               return new YarnJobClusterExecutor();
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
new file mode 100644
index 0000000..873dce4
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running 
cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutor extends 
AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+       public static final String NAME = "yarn-session-cluster";
+
+       public YarnSessionClusterExecutor() {
+               super(new YarnClusterClientFactory());
+       }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..101a622
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) 
cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
+
+       @Override
+       public boolean isCompatibleWith(@Nonnull final Configuration 
configuration) {
+               return configuration.get(DeploymentOptions.TARGET)
+                               
.equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
+       }
+
+       @Override
+       public Executor getExecutor(@Nonnull final Configuration configuration) 
{
+               return new YarnSessionClusterExecutor();
+       }
+}
diff --git 
a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
 
b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d56f8c5
--- /dev/null
+++ 
b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file

Reply via email to