[
https://issues.apache.org/jira/browse/FLINK-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092413#comment-16092413
]
ASF GitHub Bot commented on FLINK-7108:
---------------------------------------
Github user EronWright commented on a diff in the pull request:
https://github.com/apache/flink/pull/4281#discussion_r128130195
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnResourceManager;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
+
+ /** The job graph file path. */
+ public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+ private final String workingDirectory;
+
+ public YarnJobClusterEntrypoint(
+ Configuration configuration,
+ String workingDirectory) {
+
+ super(configuration);
+ this.workingDirectory =
Preconditions.checkNotNull(workingDirectory);
+ }
+
+ @Override
+ protected SecurityContext installSecurityContext(Configuration
configuration) throws Exception {
+ return
YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
+ }
+
+ @Override
+ protected ResourceManager<?> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+ final ResourceManagerConfiguration rmConfiguration =
ResourceManagerConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServicesConfiguration
rmServicesConfiguration =
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+ final ResourceManagerRuntimeServices rmRuntimeServices =
ResourceManagerRuntimeServices.fromConfiguration(
+ rmServicesConfiguration,
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor());
+
+ return new YarnResourceManager(
+ rpcService,
+ ResourceManager.RESOURCE_MANAGER_NAME,
+ resourceId,
+ configuration,
+ System.getenv(),
+ rmConfiguration,
+ highAvailabilityServices,
+ heartbeatServices,
+ rmRuntimeServices.getSlotManager(),
+ metricRegistry,
+ rmRuntimeServices.getJobLeaderIdService(),
+ fatalErrorHandler);
+ }
+
+ @Override
+ protected JobGraph retrieveJobGraph(Configuration configuration) throws
FlinkException {
+ String jobGraphFile =
configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
--- End diff --
Ideally a standard approach to locating the job to launch could be
developed across K8/YARN/etc. It hadn't occurred to me that the docker image
would have a serialized job graph within it (as opposed to the packaged program
w/ `main` method) , but it definitely simplifies the recovery model. This
has me wondering about how `flink run` would combine with this.
> Implement Session cluster entry point
> -------------------------------------
>
> Key: FLINK-7108
> URL: https://issues.apache.org/jira/browse/FLINK-7108
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management, YARN
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
>
> Implement a Yarn session cluster entry point.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)