[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8293bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8293bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8293bcb

Branch: refs/heads/flip-6
Commit: e8293bcba588296656ae8425506bd2edf94a70e4
Parents: 8e57fba
Author: shuai.xus <[email protected]>
Authored: Thu Nov 3 16:24:47 2016 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 5 02:49:44 2016 +0100

----------------------------------------------------------------------
 ...bstractYarnFlinkApplicationMasterRunner.java | 213 +++++++++++++
 .../yarn/YarnFlinkApplicationMasterRunner.java  | 316 +++++++++++++++++++
 2 files changed, 529 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..923694e
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * and {@link YarnResourceManager}.
+ *
+ * The JobMasters handles Flink job execution, while the YarnResourceManager 
handles container
+ * allocation and failure detection.
+ */
+public abstract class AbstractYarnFlinkApplicationMasterRunner {
+
+       /** Logger */
+       protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class);
+
+       /** The process environment variables */
+       protected static final Map<String, String> ENV = System.getenv();
+
+       /** The exit code returned if the initialization of the application 
master failed */
+       protected static final int INIT_ERROR_EXIT_CODE = 31;
+
+       /** The host name passed by env */
+       protected String appMasterHostname;
+
+       /**
+        * The instance entry point for the YARN application master. Obtains 
user group
+        * information and calls the main work method {@link 
#runApplicationMaster(org.apache.flink.configuration.Configuration)} as a
+        * privileged action.
+        *
+        * @param args The command line arguments.
+        * @return The process exit code.
+        */
+       protected int run(String[] args) {
+               try {
+                       LOG.debug("All environment variables: {}", ENV);
+
+                       final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+                       Preconditions.checkArgument(yarnClientUsername != null, 
"YARN client user name environment variable {} not set",
+                               YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+                       final String currDir = ENV.get(Environment.PWD.key());
+                       Preconditions.checkArgument(currDir != null, "Current 
working directory variable (%s) not set", Environment.PWD.key());
+                       LOG.debug("Current working directory: {}", currDir);
+
+                       final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
+                       LOG.debug("Remote keytab path obtained {}", 
remoteKeytabPath);
+
+                       final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+                       LOG.info("Remote keytab principal obtained {}", 
remoteKeytabPrincipal);
+
+                       String keytabPath = null;
+                       if(remoteKeytabPath != null) {
+                               File f = new File(currDir, 
Utils.KEYTAB_FILE_NAME);
+                               keytabPath = f.getAbsolutePath();
+                               LOG.debug("Keytab path: {}", keytabPath);
+                       }
+
+                       UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+
+                       LOG.info("YARN daemon is running as: {} Yarn client 
user obtainer: {}",
+                                       currentUser.getShortUserName(), 
yarnClientUsername );
+
+                       SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
+
+                       //To support Yarn Secure Integration Test Scenario
+                       File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+                       if(krb5Conf.exists() && krb5Conf.canRead()) {
+                               String krb5Path = krb5Conf.getAbsolutePath();
+                               LOG.info("KRB5 Conf: {}", krb5Path);
+                               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+                               sc.setHadoopConfiguration(conf);
+                       }
+
+                       // Flink configuration
+                       final Map<String, String> dynamicProperties =
+                                       
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+                       LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
+
+                       final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
+                       if(keytabPath != null && remoteKeytabPrincipal != null) 
{
+                               
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+                               
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
+                       }
+
+                       
SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+                       // Note that we use the "appMasterHostname" given by 
YARN here, to make sure
+                       // we use the hostnames given by YARN consistently 
throughout akka.
+                       // for akka "localhost" and "localhost.localdomain" are 
different actors.
+                       this.appMasterHostname = 
ENV.get(Environment.NM_HOST.key());
+                       Preconditions.checkArgument(appMasterHostname != null,
+                                       "ApplicationMaster hostname variable %s 
not set", Environment.NM_HOST.key());
+                       LOG.info("YARN assigned hostname for application 
master: {}", appMasterHostname);
+
+                       return SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                               @Override
+                               public Integer run() {
+                                       return 
runApplicationMaster(flinkConfig);
+                               }
+                       });
+
+               }
+               catch (Throwable t) {
+                       // make sure that everything whatever ends up in the log
+                       LOG.error("YARN Application Master initialization 
failed", t);
+                       return INIT_ERROR_EXIT_CODE;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Core work method
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The main work method, must run as a privileged action.
+        *
+        * @return The return code for the Java process.
+        */
+       protected abstract int runApplicationMaster(Configuration config);
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+       /**
+        * @param baseDirectory  The working directory
+        * @param additional Additional parameters
+        * 
+        * @return The configuration to be used by the TaskExecutors.
+        */
+       private static Configuration createConfiguration(String baseDirectory, 
Map<String, String> additional) {
+               LOG.info("Loading config from directory {}.", baseDirectory);
+
+               Configuration configuration = 
GlobalConfiguration.loadConfiguration(baseDirectory);
+
+               
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
+
+               // add dynamic properties to JobManager configuration.
+               for (Map.Entry<String, String> property : 
additional.entrySet()) {
+                       configuration.setString(property.getKey(), 
property.getValue());
+               }
+
+               // override zookeeper namespace with user cli argument (if 
provided)
+               String cliZKNamespace = 
ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
+               if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
+                       
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
+               }
+
+               // if a web monitor shall be started, set the port to random 
binding
+               if 
(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+                       
configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+               }
+
+               // if the user has set the deprecated YARN-specific config 
keys, we add the 
+               // corresponding generic config keys instead. that way, later 
code needs not
+               // deal with deprecated config keys
+
+               BootstrapTools.substituteDeprecatedConfigKey(configuration,
+                       ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
+                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+
+               BootstrapTools.substituteDeprecatedConfigKey(configuration,
+                       ConfigConstants.YARN_HEAP_CUTOFF_MIN,
+                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
+
+               BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+                       ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
+                       ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+
+               BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
+                       ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
+                       ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+
+               return configuration;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e8293bcb/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
new file mode 100644
index 0000000..e58c77e
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+
+/**
+ * This class is the executable entry point for the YARN application master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobManagerRunner}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasnagerRunner start a {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
+ * JobMaster handles Flink job execution, while the YarnResourceManager 
handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicationMasterRunner
+               implements OnCompletionActions, FatalErrorHandler {
+
+       /** Logger */
+       protected static final Logger LOG = 
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+       /** The job graph file path */
+       private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+       /** The lock to guard startup / shutdown / manipulation methods */
+       private final Object lock = new Object();
+
+       @GuardedBy("lock")
+       private MetricRegistry metricRegistry;
+
+       @GuardedBy("lock")
+       private HighAvailabilityServices haServices;
+
+       @GuardedBy("lock")
+       private RpcService commonRpcService;
+
+       @GuardedBy("lock")
+       private ResourceManager resourceManager;
+
+       @GuardedBy("lock")
+       private JobManagerRunner jobManagerRunner;
+
+       @GuardedBy("lock")
+       private JobGraph jobGraph;
+
+       // 
------------------------------------------------------------------------
+       //  Program entry point
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The entry point for the YARN application master.
+        *
+        * @param args The command line arguments.
+        */
+       public static void main(String[] args) {
+               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster runner", args);
+               SignalHandler.register(LOG);
+               JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+               // run and exit with the proper return code
+               int returnCode = new 
YarnFlinkApplicationMasterRunner().run(args);
+               System.exit(returnCode);
+       }
+
+       @Override
+       protected int runApplicationMaster(Configuration config) {
+
+               try {
+                       // ---- (1) create common services
+
+                       // try to start the rpc service
+                       // using the port range definition from the config.
+                       final String amPortRange = config.getString(
+                                       
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+                                       
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+                       synchronized (lock) {
+                               haServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+                               metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+                               commonRpcService = createRpcService(config, 
appMasterHostname, amPortRange);
+
+                               // ---- (2) init resource manager -------
+                               resourceManager = createResourceManager(config);
+
+                               // ---- (3) init job master parameters
+                               jobManagerRunner = 
createJobManagerRunner(config);
+
+                               // ---- (4) start the resource manager  and job 
manager runner:
+                               resourceManager.start();
+                               LOG.debug("YARN Flink Resource Manager 
started");
+
+                               jobManagerRunner.start();
+                               LOG.debug("Job Manager Runner started");
+
+                               // ---- (5) start the web monitor
+                               // TODO: add web monitor
+                       }
+
+                       // wait for resource manager to finish
+                       resourceManager.getTerminationFuture().get();
+                       // everything started, we can wait until all is done or 
the process is killed
+                       LOG.info("YARN Application Master finished");
+               }
+               catch (Throwable t) {
+                       // make sure that everything whatever ends up in the log
+                       LOG.error("YARN Application Master initialization 
failed", t);
+                       shutdown(ApplicationStatus.FAILED, t.getMessage());
+                       return INIT_ERROR_EXIT_CODE;
+               }
+
+               return 0;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       protected RpcService createRpcService(
+                       Configuration configuration,
+                       String bindAddress,
+                       String portRange) throws Exception{
+               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
+               FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+               return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
+       }
+
+       private ResourceManager createResourceManager(Configuration config) 
throws ConfigurationException {
+               final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(config);
+               final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
+               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(haServices);
+
+               return new YarnResourceManager(config,
+                               ENV,
+                               commonRpcService,
+                               resourceManagerConfiguration,
+                               haServices,
+                               slotManagerFactory,
+                               metricRegistry,
+                               jobLeaderIdService,
+                               this);
+       }
+
+       private JobManagerRunner createJobManagerRunner(Configuration config) 
throws Exception{
+               // first get JobGraph from local resources
+               //TODO: generate the job graph from user's jar
+               jobGraph = loadJobGraph(config);
+
+               // we first need to mark the job as running in the HA services, 
so that the
+               // JobManager leader will recognize that it as work to do
+               try {
+                       
haServices.getRunningJobsRegistry().setJobRunning(jobGraph.getJobID());
+               }
+               catch (Throwable t) {
+                       throw new JobExecutionException(jobGraph.getJobID(),
+                                       "Could not register the job at the 
high-availability services", t);
+               }
+
+               // now the JobManagerRunner
+               return new JobManagerRunner(
+                               jobGraph, config,
+                               commonRpcService,
+                               haServices,
+                               this,
+                               this);
+       }
+
+       protected void shutdown(ApplicationStatus status, String msg) {
+               synchronized (lock) {
+                       if (jobManagerRunner != null) {
+                               try {
+                                       jobManagerRunner.shutdown();
+                               } catch (Throwable tt) {
+                                       LOG.warn("Failed to stop the 
JobManagerRunner", tt);
+                               }
+                       }
+                       if (resourceManager != null) {
+                               try {
+                                       resourceManager.shutDownCluster(status, 
msg);
+                                       resourceManager.shutDown();
+                               } catch (Throwable tt) {
+                                       LOG.warn("Failed to stop the 
ResourceManager", tt);
+                               }
+                       }
+                       if (commonRpcService != null) {
+                               try {
+                                       commonRpcService.stopService();
+                               } catch (Throwable tt) {
+                                       LOG.error("Error shutting down resource 
manager rpc service", tt);
+                               }
+                       }
+                       if (haServices != null) {
+                               try {
+                                       haServices.shutdown();
+                               } catch (Throwable tt) {
+                                       LOG.warn("Failed to stop the HA 
service", tt);
+                               }
+                       }
+                       if (metricRegistry != null) {
+                               try {
+                                       metricRegistry.shutdown();
+                               } catch (Throwable tt) {
+                                       LOG.warn("Failed to stop the metrics 
registry", tt);
+                               }
+                       }
+               }
+       }
+
+       private static JobGraph loadJobGraph(Configuration config) throws 
Exception {
+               JobGraph jg = null;
+               String jobGraphFile = config.getString(JOB_GRAPH_FILE_PATH, 
"job.graph");
+               if (jobGraphFile != null) {
+                       File fp = new File(jobGraphFile);
+                       if (fp.isFile()) {
+                               FileInputStream input = new FileInputStream(fp);
+                               ObjectInputStream obInput = new 
ObjectInputStream(input);
+                               jg = (JobGraph) obInput.readObject();
+                               input.close();
+                       }
+               }
+               if (jg == null) {
+                       throw new Exception("Fail to load job graph " + 
jobGraphFile);
+               }
+               return jg;
+       }
+
+       
//-------------------------------------------------------------------------------------
+       // Fatal error handler
+       
//-------------------------------------------------------------------------------------
+
+       @Override
+       public void onFatalError(Throwable exception) {
+               LOG.error("Encountered fatal error.", exception);
+
+               shutdown(ApplicationStatus.FAILED, exception.getMessage());
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Result and error handling methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Job completion notification triggered by JobManager
+        */
+       @Override
+       public void jobFinished(JobExecutionResult result) {
+               shutdown(ApplicationStatus.SUCCEEDED, null);
+       }
+
+       /**
+        * Job completion notification triggered by JobManager
+        */
+       @Override
+       public void jobFailed(Throwable cause) {
+               shutdown(ApplicationStatus.FAILED, cause.getMessage());
+       }
+
+       /**
+        * Job completion notification triggered by self
+        */
+       @Override
+       public void jobFinishedByOther() {
+               shutdown(ApplicationStatus.UNKNOWN, null);
+       }
+}

Reply via email to