This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 713e1d9  [GOBBLIN-1110] fix deadlock in job cancellation replacing 
deprecated class MessageHandlerFactory with MultiTypeMessageHandlerFactory
713e1d9 is described below

commit 713e1d96c00a0e7dda58649991c09b2b24de68b0
Author: Arjun <[email protected]>
AuthorDate: Tue Apr 7 15:23:25 2020 -0700

    [GOBBLIN-1110] fix deadlock in job cancellation
    replacing deprecated class MessageHandlerFactory with 
MultiTypeMessageHandlerFactory
    
    Closes #2950 from arjun4084346/taskDriverStop
---
 .../apache/gobblin/aws/GobblinAWSTaskRunner.java   |  7 ++--
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  4 +--
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 42 +++++++++++-----------
 .../org/apache/gobblin/metrics/GobblinMetrics.java |  6 +---
 .../gobblin/runtime/AbstractJobLauncher.java       |  2 ++
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 12 +++----
 6 files changed, 37 insertions(+), 36 deletions(-)

diff --git 
a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java 
b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
index 4bff1ff..0e8f1eb 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
@@ -30,6 +30,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,15 +89,15 @@ public class GobblinAWSTaskRunner extends GobblinTaskRunner 
{
   }
 
   @Override
-  public MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  public MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
     return new ParticipantUserDefinedMessageHandlerFactory();
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link 
ParticipantUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link 
ParticipantUserDefinedMessageHandler}s that
    * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private static class ParticipantUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
+  private static class ParticipantUserDefinedMessageHandlerFactory implements 
MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext 
context) {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 418ce62..33758b0 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -255,9 +255,9 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
           // work flow should never be deleted explicitly because it has a 
expiry time
           // If cancellation is requested, we should set the job state to 
CANCELLED/ABORT
           this.helixTaskDriver.waitToStop(this.helixWorkFlowName, 
this.helixJobStopTimeoutSeconds);
-          log.info("stopped the workflow ", this.helixWorkFlowName);
+          log.info("stopped the workflow {}", this.helixWorkFlowName);
         }
-      } catch (HelixException e) {
+      } catch (RuntimeException e) {
         // Cancellation may throw an exception, but Helix set the job state to 
STOP and it should eventually stop
         // We will keep this.cancellationExecuted and 
this.cancellationRequested to true and not propagate the exception
         log.error("Failed to stop workflow {} in Helix", helixWorkFlowName, e);
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 7ed555e..8e6cdd1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -55,7 +55,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModelFactory;
@@ -172,7 +172,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     this.clusterName = 
this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
     logger.info("Configured GobblinTaskRunner work dir to: {}", 
this.appWorkPath.toString());
 
-    //Set system properties passed in via application config. As an example, 
Helix uses System#getProperty() for ZK configuration
+    // Set system properties passed in via application config. As an example, 
Helix uses System#getProperty() for ZK configuration
     // overrides such as sessionTimeout. In this case, the overrides specified
     // in the application configuration have to be extracted and set before 
initializing HelixManager.
     HelixUtils.setSystemProperties(config);
@@ -182,7 +182,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     this.containerMetrics = buildContainerMetrics();
 
     logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName 
{}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
-        this.isTaskDriver? "taskDriver" : "worker",
+        this.isTaskDriver ? "taskDriver" : "worker",
         applicationName,
         helixInstanceName,
         applicationId,
@@ -324,7 +324,13 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   }
 
   public synchronized void stop() {
-    if (this.isStopped || this.stopInProgress) {
+    if (this.isStopped) {
+      logger.info("Gobblin Task runner is already stopped.");
+      return;
+    }
+
+    if (this.stopInProgress) {
+      logger.info("Gobblin Task runner stop already in progress.");
       return;
     }
 
@@ -340,8 +346,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     try {
       stopServices();
     } finally {
+      logger.info("All services are stopped.");
       this.taskStateModelFactory.shutdown();
-
       disconnectHelixManager();
     }
 
@@ -471,12 +477,12 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   }
 
   /**
-   * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
+   * Creates and returns a {@link MultiTypeMessageHandlerFactory} for handling 
of Helix
    * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s.
    *
-   * @returns a {@link MessageHandlerFactory}.
+   * @returns a {@link MultiTypeMessageHandlerFactory}.
    */
-  protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  protected MultiTypeMessageHandlerFactory 
getUserDefinedMessageHandlerFactory() {
     return new ParticipantUserDefinedMessageHandlerFactory();
   }
 
@@ -518,10 +524,10 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link 
ParticipantShutdownMessageHandler}s that handle messages
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link 
ParticipantShutdownMessageHandler}s that handle messages
    * of type "SHUTDOWN" for shutting down the participants.
    */
-  private class ParticipantShutdownMessageHandlerFactory implements 
MessageHandlerFactory {
+  private class ParticipantShutdownMessageHandlerFactory implements 
MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext 
context) {
@@ -553,13 +559,11 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
       }
 
       @Override
-      public HelixTaskResult handleMessage()
-          throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         String messageSubType = this._message.getMsgSubType();
         Preconditions.checkArgument(messageSubType
             
.equalsIgnoreCase(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString()), 
String
-            .format("Unknown %s message subtype: %s", 
GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-                messageSubType));
+            .format("Unknown %s message subtype: %s", 
GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType));
 
         HelixTaskResult result = new HelixTaskResult();
 
@@ -568,8 +572,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
           return result;
         }
 
-        logger
-            .info("Handling message " + 
HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
+        logger.info("Handling message " + 
HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
 
         ScheduledExecutorService shutdownMessageHandlingCompletionWatcher =
             MoreExecutors.getExitingScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1));
@@ -607,10 +610,10 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link 
ParticipantUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link 
ParticipantUserDefinedMessageHandler}s that
    * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private static class ParticipantUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
+  private static class ParticipantUserDefinedMessageHandlerFactory implements 
MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext 
context) {
@@ -647,8 +650,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
       }
 
       @Override
-      public HelixTaskResult handleMessage()
-          throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         logger.warn(String.format("No handling setup for %s message of 
subtype: %s",
             Message.MessageType.USER_DEFINE_MSG.toString(), 
this._message.getMsgSubType()));
 
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index f75da78..e969083 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -771,15 +771,13 @@ public class GobblinMetrics {
           return;
         }
         this.codahaleReportersCloser.register(scheduledReporter);
-        String reporterSinkMsg = reporterSink.isPresent()?"to " + 
reporterSink.get():"";
+        String reporterSinkMsg = reporterSink.isPresent() ? "to " + 
reporterSink.get() : "";
         LOGGER.info("Will start reporting metrics " + reporterSinkMsg + " 
using " + reporterClass);
         this.codahaleScheduledReporters.add(scheduledReporter);
-
       } else if (CustomReporterFactory.class.isAssignableFrom(clazz)) {
         CustomReporterFactory customReporterFactory = ((CustomReporterFactory) 
clazz.getConstructor().newInstance());
         customReporterFactory.newScheduledReporter(properties);
         LOGGER.info("Will start reporting metrics using " + reporterClass);
-
       } else {
         throw new IllegalArgumentException("Class " + reporterClass +
             " specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS + 
" must implement: "
@@ -789,11 +787,9 @@ public class GobblinMetrics {
       LOGGER.warn(String
           .format("Failed to create metric reporter: requested 
CustomReporterFactory %s not found.", reporterClass),
           exception);
-
     } catch (NoSuchMethodException exception) {
       LOGGER.warn(String.format("Failed to create metric reporter: requested 
CustomReporterFactory %s "
           + "does not have parameterless constructor.", reporterClass), 
exception);
-
     } catch (Exception exception) {
       LOGGER.warn("Could not create metric reporter from builder " + 
reporterClass + ".", exception);
     }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 8ec6fed..7d3081b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -714,6 +714,8 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
   /**
    * Execute the job cancellation.
+   * The implementation should not throw any exceptions because that will kill 
the `Cancellation Executor` thread
+   * and will create a deadlock.
    */
   protected abstract void executeCancellation();
 
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index d59a429..8e88b2e 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +84,7 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
             
services.add(gobblinYarnLogSource.buildLogCopier(this.clusterConfig, 
this.taskRunnerId, this.fs,
                 new Path(containerLogDir, 
GobblinClusterUtils.getAppWorkDirPath(this.applicationName, 
this.applicationId))));
         } catch (Exception e) {
-          LOGGER.warn("Cannot add LogCopier service to the service manager due 
to {}", e);
+          LOGGER.warn("Cannot add LogCopier service to the service manager due 
to", e);
         }
       }
     }
@@ -92,15 +92,15 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
   }
 
   @Override
-  public MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
+  public MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
     return new ParticipantUserDefinedMessageHandlerFactory();
   }
 
   /**
-   * A custom {@link MessageHandlerFactory} for {@link 
ParticipantUserDefinedMessageHandler}s that
+   * A custom {@link MultiTypeMessageHandlerFactory} for {@link 
ParticipantUserDefinedMessageHandler}s that
    * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
    */
-  private class ParticipantUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
+  private class ParticipantUserDefinedMessageHandlerFactory implements 
MultiTypeMessageHandlerFactory {
 
     @Override
     public MessageHandler createHandler(Message message, NotificationContext 
context) {
@@ -139,7 +139,7 @@ public class GobblinYarnTaskRunner extends 
GobblinTaskRunner {
       }
 
       @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
+      public HelixTaskResult handleMessage() {
         String messageSubType = this._message.getMsgSubType();
 
         if 
(messageSubType.equalsIgnoreCase(org.apache.gobblin.cluster.HelixMessageSubTypes.TOKEN_FILE_UPDATED.toString()))
 {

Reply via email to