This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors-clean in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc182735564bf6a6a0fdd687ec260783ceeda8e2 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 18 17:10:15 2019 +0100 [FLINK-XXXXX] Add the Yarn/Standalone Executors --- .../deployment/AbstractJobClusterExecutor.java | 68 ++++++++++++++++- .../deployment/AbstractSessionClusterExecutor.java | 65 ++++++++++++++++- .../flink/client/deployment/ExecutorUtils.java | 55 ++++++++++++++ .../flink/client/deployment/JobClientImpl.java | 85 +++++++++++++++++++++- .../StandaloneSessionClusterExecutor.java | 38 ++++++++++ .../StandaloneSessionClusterExecutorFactory.java | 45 ++++++++++++ ...org.apache.flink.core.execution.ExecutorFactory | 16 ++++ .../yarn/executors/YarnJobClusterExecutor.java | 41 +++++++++++ .../executors/YarnJobClusterExecutorFactory.java | 45 ++++++++++++ .../yarn/executors/YarnSessionClusterExecutor.java | 39 ++++++++++ .../YarnSessionClusterExecutorFactory.java | 45 ++++++++++++ ...org.apache.flink.core.execution.ExecutorFactory | 17 +++++ 12 files changed, 556 insertions(+), 3 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java index 14a93bc..bca403a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java @@ -1,4 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.client.deployment; -public class AbstractJobClusterExecutor { +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters. + * + * @param <ClusterID> the type of the id of the cluster. + * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster. + */ +@Internal +public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class); + + private final ClientFactory clusterClientFactory; + + public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { + this.clusterClientFactory = checkNotNull(clusterClientFactory); + } + + @Override + public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception { + final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { + final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); + + final ClusterClient<ClusterID> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); + LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); + return CompletableFuture.completedFuture(new JobClientImpl<>(clusterClient, jobGraph.getJobID())); + } + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java index ab8cc01..413ff58 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java @@ -1,4 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.client.deployment; -public class AbstractSessionClusterExecutor { +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster. + * + * @param <ClusterID> the type of the id of the cluster. + * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster. + */ +@Internal +public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor { + + private final ClientFactory clusterClientFactory; + + public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { + this.clusterClientFactory = checkNotNull(clusterClientFactory); + } + + @Override + public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception { + final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { + final ClusterID clusterID = clusterClientFactory.getClusterId(configuration); + checkState(clusterID != null); + + final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID); + return clusterClient + .submitJob(jobGraph) + .thenApply(JobSubmissionResult::getJobID) + .thenApply(jobID -> new JobClientImpl<>(clusterClient, jobID)); + } + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java index e134206..4541b1c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java @@ -1,4 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.client.deployment; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.FlinkPipelineTranslationUtil; +import org.apache.flink.client.cli.ExecutionConfigAccessor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import javax.annotation.Nonnull; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class with method related to job execution. + */ public class ExecutorUtils { + + /** + * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}. + * + * @param pipeline the pipeline whose job graph we are computing + * @param configuration the configuration with the necessary information such as jars and + * classpaths to be included, the parallelism of the job and potential + * savepoint settings used to bootstrap its state. + * @return the corresponding {@link JobGraph}. + */ + public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) { + checkNotNull(pipeline); + checkNotNull(configuration); + + final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + final JobGraph jobGraph = FlinkPipelineTranslationUtil + .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); + + jobGraph.addJars(executionConfigAccessor.getJars()); + jobGraph.setClasspaths(executionConfigAccessor.getClasspaths()); + jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); + + return jobGraph; + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java index e811fc8..29d8008 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java @@ -1,4 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.client.deployment; -public class JobClientImpl { +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.DetachedJobExecutionResult; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation of the {@link JobClient} interface. + */ +public class JobClientImpl<ClusterID> implements JobClient { + + private final ClusterClient<ClusterID> clusterClient; + + private final JobID jobID; + + public JobClientImpl(final ClusterClient<ClusterID> clusterClient, final JobID jobID) { + this.jobID = checkNotNull(jobID); + this.clusterClient = checkNotNull(clusterClient); + } + + @Override + public JobID getJobID() { + return jobID; + } + + @Override + public CompletableFuture<JobExecutionResult> getJobSubmissionResult() { + return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID)); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) { + final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>(); + + final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID); + jobResultFuture.whenComplete(((jobResult, throwable) -> { + if (throwable != null) { + ExceptionUtils.checkInterrupted(throwable); + res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable)); + } else { + try { + final JobExecutionResult result = jobResult.toJobExecutionResult(userClassloader); + res.complete(result); + } catch (JobExecutionException | IOException | ClassNotFoundException e) { + res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e)); + } + } + })); + return res; + } + + @Override + public void close() throws Exception { + this.clusterClient.close(); + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java new file mode 100644 index 0000000..f097323 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.deployment.AbstractSessionClusterExecutor; +import org.apache.flink.client.deployment.StandaloneClientFactory; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.core.execution.Executor; + +/** + * The {@link Executor} to be used when executing a job on an already running cluster. + */ +@Internal +public class StandaloneSessionClusterExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> { + + public static final String NAME = "standalone-session-cluster"; + + public StandaloneSessionClusterExecutor() { + super(new StandaloneClientFactory()); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java new file mode 100644 index 0000000..43c116e --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.ExecutorFactory; + +import javax.annotation.Nonnull; + +/** + * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster. + */ +@Internal +public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory { + + @Override + public boolean isCompatibleWith(@Nonnull final Configuration configuration) { + return configuration.get(DeploymentOptions.TARGET) + .equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME); + } + + @Override + public Executor getExecutor(@Nonnull final Configuration configuration) { + return new StandaloneSessionClusterExecutor(); + } +} diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory new file mode 100644 index 0000000..d9b144f --- /dev/null +++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory \ No newline at end of file diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java new file mode 100644 index 0000000..084b020 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.deployment.AbstractJobClusterExecutor; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.yarn.YarnClusterClientFactory; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * The {@link Executor} to be used when executing a job in isolation. + * This executor will start a cluster specifically for the job at hand and + * tear it down when the job is finished either successfully or due to an error. + */ +@Internal +public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> { + + public static final String NAME = "yarn-job-cluster"; + + public YarnJobClusterExecutor() { + super(new YarnClusterClientFactory()); + } +} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java new file mode 100644 index 0000000..9dc6fd1 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.ExecutorFactory; + +import javax.annotation.Nonnull; + +/** + * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters. + */ +@Internal +public class YarnJobClusterExecutorFactory implements ExecutorFactory { + + @Override + public boolean isCompatibleWith(@Nonnull final Configuration configuration) { + return configuration.get(DeploymentOptions.TARGET) + .equalsIgnoreCase(YarnJobClusterExecutor.NAME); + } + + @Override + public Executor getExecutor(@Nonnull final Configuration configuration) { + return new YarnJobClusterExecutor(); + } +} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java new file mode 100644 index 0000000..873dce4 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.deployment.AbstractSessionClusterExecutor; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.yarn.YarnClusterClientFactory; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * The {@link Executor} to be used when executing a job on an already running cluster. + */ +@Internal +public class YarnSessionClusterExecutor extends AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> { + + public static final String NAME = "yarn-session-cluster"; + + public YarnSessionClusterExecutor() { + super(new YarnClusterClientFactory()); + } +} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java new file mode 100644 index 0000000..101a622 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.ExecutorFactory; + +import javax.annotation.Nonnull; + +/** + * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster. + */ +@Internal +public class YarnSessionClusterExecutorFactory implements ExecutorFactory { + + @Override + public boolean isCompatibleWith(@Nonnull final Configuration configuration) { + return configuration.get(DeploymentOptions.TARGET) + .equalsIgnoreCase(YarnSessionClusterExecutor.NAME); + } + + @Override + public Executor getExecutor(@Nonnull final Configuration configuration) { + return new YarnSessionClusterExecutor(); + } +} diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory new file mode 100644 index 0000000..d56f8c5 --- /dev/null +++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory +org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory \ No newline at end of file