[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16133056#comment-16133056
 ] 

ASF GitHub Bot commented on FLINK-6630:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4555#discussion_r133974899
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.entrypoint;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
    +import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
    +import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
    +import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.clusterframework.ContainerSpecification;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
    +import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
    +import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.FlinkException;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.cli.PosixParser;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.util.Map;
    +
    +/**
    + * Entry point for Mesos per-job clusters.
    + */
    +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
    +
    +   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
    +
    +   // 
------------------------------------------------------------------------
    +   //  Command-line options
    +   // 
------------------------------------------------------------------------
    +
    +   private static final Options ALL_OPTIONS;
    +
    +   static {
    +           ALL_OPTIONS =
    +                   new Options()
    +                           
.addOption(BootstrapTools.newDynamicPropertiesOption());
    +   }
    +
    +   private MesosConfiguration schedulerConfiguration;
    +
    +   private MesosServices mesosServices;
    +
    +   private MesosTaskManagerParameters taskManagerParameters;
    +
    +   private ContainerSpecification taskManagerContainerSpec;
    +
    +   public MesosJobClusterEntrypoint(Configuration config) {
    +           super(config);
    +   }
    +
    +   @Override
    +   protected void initializeServices(Configuration config) throws 
Exception {
    +           super.initializeServices(config);
    +
    +           final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
    +
    +           // Mesos configuration
    +           schedulerConfiguration = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
    +
    +           // services
    +           mesosServices = MesosServicesUtils.createMesosServices(config, 
hostname);
    +
    +           // TM configuration
    +           taskManagerParameters = 
MesosEntrypointUtils.createTmParameters(config, LOG);
    +           taskManagerContainerSpec = 
MesosEntrypointUtils.createContainerSpec(config, 
GlobalConfiguration.getDynamicProperties());
    +   }
    +
    +   @Override
    +   protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry 
metricRegistry) throws Exception {
    +           super.startClusterComponents(configuration, rpcService, 
highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
    +   }
    +
    +   @Override
    +   protected ResourceManager<?> createResourceManager(
    +           Configuration configuration,
    +           ResourceID resourceId,
    +           RpcService rpcService,
    +           HighAvailabilityServices highAvailabilityServices,
    +           HeartbeatServices heartbeatServices,
    +           MetricRegistry metricRegistry,
    +           FatalErrorHandler fatalErrorHandler) throws Exception {
    +           final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
    +           final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
    +           final ResourceManagerRuntimeServices rmRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
    +                   rmServicesConfiguration,
    +                   highAvailabilityServices,
    +                   rpcService.getScheduledExecutor());
    +
    +           return new MesosResourceManager(
    +                   rpcService,
    +                   ResourceManager.RESOURCE_MANAGER_NAME,
    +                   resourceId,
    +                   rmConfiguration,
    +                   highAvailabilityServices,
    +                   heartbeatServices,
    +                   rmRuntimeServices.getSlotManager(),
    +                   metricRegistry,
    +                   rmRuntimeServices.getJobLeaderIdService(),
    +                   fatalErrorHandler,
    +                   configuration,
    +                   mesosServices,
    +                   schedulerConfiguration,
    +                   taskManagerParameters,
    +                   taskManagerContainerSpec
    +                   );
    +   }
    +
    +   @Override
    +   protected JobGraph retrieveJobGraph(Configuration configuration) throws 
FlinkException {
    +           String jobGraphFile = 
configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
    +           File fp = new File(jobGraphFile);
    +
    +           try (FileInputStream input = new FileInputStream(fp);
    +                   ObjectInputStream obInput = new 
ObjectInputStream(input)) {
    +
    +                   return (JobGraph) obInput.readObject();
    +           } catch (FileNotFoundException e) {
    +                   throw new FlinkException("Could not find the JobGraph 
file.", e);
    +           } catch (ClassNotFoundException | IOException e) {
    +                   throw new FlinkException("Could not load the JobGraph 
from file.", e);
    +           }
    +   }
    +
    +   @Override
    +   protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
    +           Throwable exception = null;
    +
    +           try {
    +                   super.stopClusterComponents(cleanupHaData);
    +           } catch (Throwable t) {
    +                   exception = t;
    +           }
    +
    +           if (mesosServices != null) {
    +                   try {
    +                           mesosServices.close(cleanupHaData);
    +                   } catch (Throwable t) {
    +                           exception = t;
    --- End diff --
    
    Here we could use `ExceptionUtils.firstOrSuppressed` in order to add this 
as a suppressed exception if the super call also failed before.


> Implement FLIP-6 MesosAppMasterRunner
> -------------------------------------
>
>                 Key: FLINK-6630
>                 URL: https://issues.apache.org/jira/browse/FLINK-6630
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Mesos
>            Reporter: Eron Wright 
>            Assignee: Eron Wright 
>
> A new runner must be developed for the FLIP-6 RM.  Target the "single job" 
> scenario.
> Take some time to consider a general solution or a base implementation that 
> is shared with the old implementation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to