[ 
https://issues.apache.org/jira/browse/FLINK-7082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101400#comment-16101400
 ] 

ASF GitHub Bot commented on FLINK-7082:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4261#discussion_r129519933
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.entrypoint;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.security.SecurityUtils;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +
    +import akka.actor.ActorSystem;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.Executor;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +/**
    + * Base class for the Flink cluster entry points.
    + *
    + * <p>Specialization of this class can be used for the session mode and 
the per-job mode
    + */
    +public abstract class ClusterEntrypoint implements FatalErrorHandler {
    +
    +   protected static final Logger LOG = 
LoggerFactory.getLogger(ClusterEntrypoint.class);
    +
    +   protected static final int SUCCESS_RETURN_CODE = 0;
    +   protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    +   protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    +
    +   /** The lock to guard startup / shutdown / manipulation methods. */
    +   private final Object lock = new Object();
    +
    +   @GuardedBy("lock")
    +   private MetricRegistry metricRegistry = null;
    +
    +   @GuardedBy("lock")
    +   private HighAvailabilityServices haServices = null;
    +
    +   @GuardedBy("lock")
    +   private BlobServer blobServer = null;
    +
    +   @GuardedBy("lock")
    +   private HeartbeatServices heartbeatServices = null;
    +
    +   @GuardedBy("lock")
    +   private RpcService commonRpcService = null;
    +
    +   protected void startCluster(String[] args) {
    +           final ClusterConfiguration clusterConfiguration = 
parseArguments(args);
    +
    +           final Configuration configuration = 
loadConfiguration(clusterConfiguration);
    +
    +           try {
    +                   SecurityContext securityContext = 
installSecurityContext(configuration);
    +
    +                   securityContext.runSecured(new Callable<Void>() {
    +                           @Override
    +                           public Void call() throws Exception {
    +                                   runCluster(configuration);
    +
    +                                   return null;
    +                           }
    +                   });
    +           } catch (Throwable t) {
    +                   LOG.error("Cluster initialization failed.", t);
    +
    +                   try {
    +                           shutDown(false);
    +                   } catch (Throwable st) {
    +                           LOG.error("Could not properly shut down cluster 
entrypoint.", st);
    +                   }
    +
    +                   System.exit(STARTUP_FAILURE_RETURN_CODE);
    +           }
    +   }
    +
    +   protected ClusterConfiguration parseArguments(String[] args) {
    +           ParameterTool parameterTool = ParameterTool.fromArgs(args);
    +
    +           final String configDir = parameterTool.get("configDir", "");
    +
    +           return new ClusterConfiguration(configDir);
    +   }
    +
    +   protected Configuration loadConfiguration(ClusterConfiguration 
clusterConfiguration) {
    --- End diff --
    
    Follow-up JIRA: https://issues.apache.org/jira/browse/FLINK-7270


> Flip-6: Generic entry point for Flink sessions
> ----------------------------------------------
>
>                 Key: FLINK-7082
>                 URL: https://issues.apache.org/jira/browse/FLINK-7082
>             Project: Flink
>          Issue Type: Bug
>          Components: Cluster Management
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>
> Implement a generic entry point for Flink sessions. This 
> {{ClusterEntryPoint}} has to start a {{ResourceManager}}, the {{Dispatcher}} 
> component and the cluster's RESTful endpoint. This class could serve as the 
> basis for a {{Mesos-}} and {{YarnEntryPoint}} to run Flink sessions.
> Maybe we can use a common base for the session and the per-job mode. The 
> session has to start a dispatcher component and the per-job mode retrieves 
> the {{JobGraph}} and directly starts a {{JobManager}} with this job.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to