Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 0cbaf5996 -> 15b7cd08f


[GOBBLIN-480] Separate job distribution cluster with gobblin cluster manager 
cluster

Closes #2349 from yukuai518/controller


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/15b7cd08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/15b7cd08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/15b7cd08

Branch: refs/heads/master
Commit: 15b7cd08f724f1de471cc59f46323b234215b2f9
Parents: 0cbaf59
Author: Kuai Yu <[email protected]>
Authored: Tue May 1 14:40:28 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue May 1 14:40:28 2018 -0700

----------------------------------------------------------------------
 .../GobblinClusterConfigurationKeys.java        |   3 +
 .../gobblin/cluster/GobblinClusterManager.java  | 377 +++-----------
 .../cluster/GobblinHelixMultiManager.java       | 494 +++++++++++++++++++
 .../cluster/LeadershipChangeAwareComponent.java |  24 +
 .../gobblin/cluster/ClusterIntegrationTest.java |  21 +-
 .../gobblin/cluster/GobblinClusterKillTest.java |   4 +-
 gobblin-cluster/src/test/resources/log4j.xml    |  12 +-
 7 files changed, 618 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 76382e7..1ad7445 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -49,6 +49,9 @@ public class GobblinClusterConfigurationKeys {
 
   // Helix configuration properties.
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helix.cluster.name";
+  public static final String MANAGER_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "manager.cluster.name";
+  public static final String DEDICATED_MANAGER_CLUSTER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "dedicatedManagerCluster.enabled";
+  public static final String DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "dedicatedJobClusterController.enabled";
   public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "zk.connection.string";
   public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX + 
"work.unit.file.path";
   public static final String HELIX_INSTANCE_NAME_OPTION_NAME = 
"helix_instance_name";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 1592cf8..b1d0f43 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,14 +20,11 @@ package org.apache.gobblin.cluster;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -38,35 +35,20 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.Criteria;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
-import org.apache.helix.task.TargetState;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.WorkflowConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -74,13 +56,14 @@ import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nonnull;
 import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -117,13 +100,15 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  * @author Yinan Li
  */
 @Alpha
-public class GobblinClusterManager implements ApplicationLauncher, 
StandardMetricsBridge {
+@Slf4j
+public class GobblinClusterManager implements ApplicationLauncher, 
StandardMetricsBridge, LeadershipChangeAwareComponent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinClusterManager.class);
 
-  private HelixManager helixManager;
+  @VisibleForTesting
+  protected GobblinHelixMultiManager multiManager;
 
-  private volatile boolean stopInProgress = false;
+  private StopStatus stopStatus = new StopStatus(false);
 
   protected ServiceBasedAppLauncher applicationLauncher;
 
@@ -157,13 +142,14 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   private final String clusterName;
   private final Config config;
   private final MetricContext metricContext;
-  private final Metrics metrics;
+  private final StandardMetrics metrics;
+
   public GobblinClusterManager(String clusterName, String applicationId, 
Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
     this.clusterName = clusterName;
     this.config = config;
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
-    this.metrics = new Metrics(this.metricContext, this.config);
+    this.metrics = new StandardMetrics();
     this.isStandaloneMode = ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
 
@@ -198,7 +184,7 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
 
       this.jobCatalog =
           (MutableJobCatalog) 
GobblinConstructorUtils.invokeFirstConstructor(Class.forName(jobCatalogClassName),
-          ImmutableList.<Object>of(config
+          ImmutableList.of(config
               
.getConfig(StringUtils.removeEnd(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX,
 "."))
               .withFallback(this.config)));
     } else {
@@ -243,58 +229,6 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   }
 
   /**
-   * Handle leadership change.
-   * The applicationLauncher is only started on the leader.
-   * The leader cleans up existing jobs before starting the 
applicationLauncher.
-   * @param changeContext notification context
-   */
-  @VisibleForTesting
-  void handleLeadershipChange(NotificationContext changeContext) {
-    this.metrics.clusterLeadershipChange.update(1);
-    if (this.helixManager.isLeader()) {
-      // can get multiple notifications on a leadership change, so only start 
the application launcher the first time
-      // the notification is received
-      LOGGER.info("Leader notification for {} isLeader {} HM.isLeader {}", 
this.helixManager.getInstanceName(),
-          isLeader, this.helixManager.isLeader());
-
-      if (!isLeader) {
-        LOGGER.info("New Helix Controller leader {}", 
this.helixManager.getInstanceName());
-
-        // Clean up existing jobs
-        TaskDriver taskDriver = new TaskDriver(this.helixManager);
-        Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
-
-        for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) {
-          String queueName = entry.getKey();
-          WorkflowConfig workflowConfig = entry.getValue();
-
-          // request delete if not already requested
-          if (workflowConfig.getTargetState() != TargetState.DELETE) {
-            taskDriver.delete(queueName);
-
-            LOGGER.info("Requested delete of queue {}", queueName);
-          }
-        }
-
-        startAppLauncherAndServices();
-        isLeader = true;
-      }
-    } else {
-      // stop and reinitialize services since they are not restartable
-      // this prepares them to start when this cluster manager becomes a leader
-      if (isLeader) {
-        isLeader = false;
-        stopAppLauncherAndServices();
-        try {
-          initializeAppLauncherAndServices();
-        } catch (Exception e) {
-          throw new RuntimeException("Exception reinitializing app launcher 
services ", e);
-        }
-      }
-    }
-  }
-
-  /**
    * Start the Gobblin Cluster Manager.
    */
   @Override
@@ -302,14 +236,14 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
     LOGGER.info("Starting the Gobblin Cluster Manager");
 
     this.eventBus.register(this);
-    connectHelixManager();
+    this.multiManager.connect();
 
     if (this.isStandaloneMode) {
       // standalone mode starts non-daemon threads later, so need to have this 
thread to keep process up
       this.idleProcessThread = new Thread(new Runnable() {
         @Override
         public void run() {
-          while (!GobblinClusterManager.this.stopInProgress && 
!GobblinClusterManager.this.stopIdleProcessThread) {
+          while (!GobblinClusterManager.this.stopStatus.isStopInProgress() && 
!GobblinClusterManager.this.stopIdleProcessThread) {
             try {
               Thread.sleep(300);
             } catch (InterruptedException e) {
@@ -340,11 +274,11 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
    */
   @Override
   public synchronized void stop() {
-    if (this.stopInProgress) {
+    if (this.stopStatus.isStopInProgress()) {
       return;
     }
 
-    this.stopInProgress = true;
+    this.stopStatus.setStopInprogress(true);
 
     LOGGER.info("Stopping the Gobblin Cluster Manager");
 
@@ -364,7 +298,7 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
 
     stopAppLauncherAndServices();
 
-    disconnectHelixManager();
+    this.multiManager.disconnect();
   }
 
   /**
@@ -377,17 +311,6 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   }
 
   /**
-   * Build the {@link HelixManager} for the Application Master.
-   */
-  private HelixManager buildHelixManager(Config config, String 
zkConnectionString) {
-    String helixInstanceName = ConfigUtils.getString(config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
-        GobblinClusterManager.class.getSimpleName());
-    return HelixManagerFactory.getZKHelixManager(
-        
config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), 
helixInstanceName,
-        InstanceType.CONTROLLER, zkConnectionString);
-  }
-
-  /**
    * Build the {@link FileSystem} for the Application Master.
    */
   private FileSystem buildFileSystem(Config config) throws IOException {
@@ -402,7 +325,7 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config 
config, Path appWorkDir,
       List<? extends Tag<?>> metadataTags, SchedulerService schedulerService) 
throws Exception {
     Properties properties = ConfigUtils.configToProperties(config);
-    return new GobblinHelixJobScheduler(properties, this.helixManager, 
this.eventBus, appWorkDir, metadataTags,
+    return new GobblinHelixJobScheduler(properties, 
this.multiManager.getJobClusterHelixManager(), this.eventBus, appWorkDir, 
metadataTags,
         schedulerService, this.jobCatalog);
   }
 
@@ -440,33 +363,6 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
     return this.eventBus;
   }
 
-  @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.isLeader = false;
-      this.helixManager.connect();
-      this.helixManager.addLiveInstanceChangeListener(new 
GobblinLiveInstanceChangeListener());
-      this.helixManager.getMessagingService()
-          
.registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, new 
ControllerShutdownMessageHandlerFactory());
-      this.helixManager.getMessagingService()
-          
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-
-      // standalone mode listens for controller change
-      if (this.isStandaloneMode) {
-        // Subscribe to leadership changes
-        this.helixManager.addControllerListener(new ControllerChangeListener() 
{
-          @Override
-          public void onControllerChange(NotificationContext changeContext) {
-            handleLeadershipChange(changeContext);
-          }
-        });
-      }
-    } catch (Exception e) {
-      LOGGER.error("HelixManager failed to connect", e);
-      throw Throwables.propagate(e);
-    }
-  }
 
   /**
    * Creates and returns a {@link MessageHandlerFactory} for handling of Helix
@@ -475,28 +371,38 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
    * @returns a {@link MessageHandlerFactory}.
    */
   protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() {
-    return new ControllerUserDefinedMessageHandlerFactory();
+    return new 
GobblinHelixMultiManager.ControllerUserDefinedMessageHandlerFactory();
+  }
+
+  @VisibleForTesting
+  void connectHelixManager() {
+    this.multiManager.connect();
   }
 
   @VisibleForTesting
   void disconnectHelixManager() {
-    if (isHelixManagerConnected()) {
-      this.helixManager.disconnect();
-    }
+    this.multiManager.disconnect();
   }
 
   @VisibleForTesting
   boolean isHelixManagerConnected() {
-    return this.helixManager.isConnected();
+    return this.multiManager.isConnected();
   }
 
+  /**
+   * In separate controller mode, one controller will manage manager's HA, the 
other will handle the job dispatching and
+   * work unit assignment.
+   */
   @VisibleForTesting
   void initializeHelixManager() {
-    String zkConnectionString = 
this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString);
-
-    // This will create and register a Helix controller in ZooKeeper
-    this.helixManager = buildHelixManager(this.config, zkConnectionString);
+    this.multiManager = new GobblinHelixMultiManager(
+        this.config, new Function<Void, MessageHandlerFactory>() {
+          @Override
+          public MessageHandlerFactory apply(Void aVoid) {
+            return 
GobblinClusterManager.this.getUserDefinedMessageHandlerFactory();
+          }
+        }, this.eventBus, stopStatus) ;
+    this.multiManager.addLeadershipChangeAwareComponent(this);
   }
 
   @VisibleForTesting
@@ -525,7 +431,7 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
     // for messaging to instances
     //int messagesSent = 
this.helixManager.getMessagingService().send(criteria, shutdownRequest,
     //    new NoopReplyHandler(), timeout);
-    GobblinHelixMessagingService messagingService = new 
GobblinHelixMessagingService(this.helixManager);
+    GobblinHelixMessagingService messagingService = new 
GobblinHelixMessagingService(this.multiManager.getJobClusterHelixManager());
 
     int messagesSent = messagingService.send(criteria, shutdownRequest,
             new NoopReplyHandler(), timeout);
@@ -556,179 +462,6 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   }
 
   /**
-   * A custom implementation of {@link LiveInstanceChangeListener}.
-   */
-  private static class GobblinLiveInstanceChangeListener implements 
LiveInstanceChangeListener {
-
-    @Override
-    public void onLiveInstanceChange(List<LiveInstance> liveInstances, 
NotificationContext changeContext) {
-      for (LiveInstance liveInstance : liveInstances) {
-        LOGGER.info("Live Helix participant instance: " + 
liveInstance.getInstanceName());
-      }
-    }
-  }
-
-  private class Metrics extends StandardMetrics {
-    public static final String CLUSTER_LEADERSHIP_CHANGE = 
"clusterLeadershipChange";
-    private ContextAwareHistogram clusterLeadershipChange;
-    public Metrics(final MetricContext metricContext, final Config config) {
-      int timeWindowSizeInMinutes = ConfigUtils.getInt(config, 
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, 
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
-      this.clusterLeadershipChange = 
metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 
timeWindowSizeInMinutes, TimeUnit.MINUTES);
-      this.contextAwareMetrics.add(clusterLeadershipChange);
-    }
-
-    @Override
-    public String getName() {
-      return GobblinClusterManager.class.getName();
-    }
-  }
-
-  /**
-   * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that 
handle messages of type
-   * "SHUTDOWN" for shutting down the controller.
-   */
-  private class ControllerShutdownMessageHandlerFactory implements 
MessageHandlerFactory {
-
-    @Override
-    public MessageHandler createHandler(Message message, NotificationContext 
context) {
-      return new ControllerShutdownMessageHandler(message, context);
-    }
-
-    @Override
-    public String getMessageType() {
-      return GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE;
-    }
-
-    public List<String> getMessageTypes() {
-      return Collections.singletonList(getMessageType());
-    }
-
-    @Override
-    public void reset() {
-
-    }
-
-    /**
-     * A custom {@link MessageHandler} for handling messages of sub type
-     * {@link HelixMessageSubTypes#APPLICATION_MASTER_SHUTDOWN}.
-     */
-    private class ControllerShutdownMessageHandler extends MessageHandler {
-
-      public ControllerShutdownMessageHandler(Message message, 
NotificationContext context) {
-        super(message, context);
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
-        String messageSubType = this._message.getMsgSubType();
-        Preconditions.checkArgument(
-            
messageSubType.equalsIgnoreCase(HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()),
-            String.format("Unknown %s message subtype: %s", 
GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType));
-
-        HelixTaskResult result = new HelixTaskResult();
-
-        if (stopInProgress) {
-          result.setSuccess(true);
-          return result;
-        }
-
-        LOGGER.info("Handling message " + 
HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString());
-
-        ScheduledExecutorService shutdownMessageHandlingCompletionWatcher =
-            MoreExecutors.getExitingScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1));
-
-        // Schedule the task for watching on the removal of the shutdown 
message, which indicates that
-        // the message has been successfully processed and it's safe to 
disconnect the HelixManager.
-        // This is a hacky way of watching for the completion of processing 
the shutdown message and
-        // should be replaced by a fix to 
https://issues.apache.org/jira/browse/HELIX-611.
-        shutdownMessageHandlingCompletionWatcher.scheduleAtFixedRate(new 
Runnable() {
-          @Override
-          public void run() {
-            HelixManager helixManager = _notificationContext.getManager();
-            HelixDataAccessor helixDataAccessor = 
helixManager.getHelixDataAccessor();
-
-            HelixProperty helixProperty = helixDataAccessor
-                .getProperty(_message.getKey(helixDataAccessor.keyBuilder(), 
helixManager.getInstanceName()));
-            // The absence of the shutdown message indicates it has been 
removed
-            if (helixProperty == null) {
-              eventBus.post(new ClusterManagerShutdownRequest());
-            }
-          }
-        }, 0, 1, TimeUnit.SECONDS);
-
-        result.setSuccess(true);
-        return result;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type) {
-        LOGGER.error(
-            String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
-      }
-    }
-  }
-
-  /**
-   * A custom {@link MessageHandlerFactory} for {@link 
ControllerUserDefinedMessageHandler}s that
-   * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
-   */
-  private static class ControllerUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
-
-    @Override
-    public MessageHandler createHandler(Message message, NotificationContext 
context) {
-      return new ControllerUserDefinedMessageHandler(message, context);
-    }
-
-    @Override
-    public String getMessageType() {
-      return Message.MessageType.USER_DEFINE_MSG.toString();
-    }
-
-    public List<String> getMessageTypes() {
-      return Collections.singletonList(getMessageType());
-    }
-
-    @Override
-    public void reset() {
-
-    }
-
-    /**
-     * A custom {@link MessageHandler} for handling user-defined messages to 
the controller.
-     *
-     * <p>
-     *   Currently does not handle any user-defined messages. If this class is 
passed a custom message, it will simply
-     *   print out a warning and return successfully. Sub-classes of {@link 
GobblinClusterManager} should override
-     *   {@link #getUserDefinedMessageHandlerFactory}.
-     * </p>
-     */
-    private static class ControllerUserDefinedMessageHandler extends 
MessageHandler {
-
-      public ControllerUserDefinedMessageHandler(Message message, 
NotificationContext context) {
-        super(message, context);
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
-        LOGGER.warn(String
-            .format("No handling setup for %s message of subtype: %s", 
Message.MessageType.USER_DEFINE_MSG.toString(),
-                this._message.getMsgSubType()));
-
-        HelixTaskResult helixTaskResult = new HelixTaskResult();
-        helixTaskResult.setSuccess(true);
-        return helixTaskResult;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type) {
-        LOGGER.error(
-            String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
-      }
-    }
-  }
-
-
-  /**
    * TODO for now the cluster id is hardcoded to 1 both here and in the {@link 
GobblinTaskRunner}. In the future, the
    * cluster id should be created by the {@link GobblinClusterManager} and 
passed to each {@link GobblinTaskRunner} via
    * Helix (at least that would be the easiest approach, there are certainly 
others ways to do it).
@@ -799,4 +532,34 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
       System.exit(1);
     }
   }
+
+  @Override
+  public void becomeActive() {
+    startAppLauncherAndServices();
+  }
+
+  @Override
+  public void becomeStandby() {
+    stopAppLauncherAndServices();
+    try {
+      initializeAppLauncherAndServices();
+    } catch (Exception e) {
+      throw new RuntimeException("Exception reinitializing app launcher 
services ", e);
+    }
+  }
+
+  static class StopStatus {
+    @Getter
+    @Setter
+    AtomicBoolean isStopInProgress;
+    public StopStatus(boolean inProgress) {
+      isStopInProgress = new AtomicBoolean(inProgress);
+    }
+    public void setStopInprogress (boolean inProgress) {
+      isStopInProgress.set(inProgress);
+    }
+    public boolean isStopInProgress () {
+      return isStopInProgress.get();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
new file mode 100644
index 0000000..f659978
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.cluster;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.task.TargetState;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nonnull;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * Encapsulate all Helix related components: controller, participants, etc.
+ * Provide all kinds of callbacks, listeners, message handlers that each Helix 
components need to register.
+ */
+@Slf4j
+public class GobblinHelixMultiManager implements StandardMetricsBridge {
+
+  /**
+   * Helix manager to handle cluster manager leader election.
+   * Corresponds to cluster with key name {@link 
GobblinClusterConfigurationKeys#MANAGER_CLUSTER_NAME_KEY} iff 
dedicatedManagerCluster is true.
+   * Corresponds to cluster with key name {@link 
GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY} iff 
dedicatedManagerCluster is false.
+   */
+  @Getter
+  private HelixManager managerClusterHelixManager = null;
+
+  /**
+   * Helix manager to handle job distribution.
+   * Corresponds to cluster with key name {@link 
GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY}.
+   */
+  @Getter
+  private HelixManager jobClusterHelixManager = null;
+
+  /**
+   * Helix controller for job distribution. Effective only iff below two 
conditions are established:
+   * 1. In {@link GobblinHelixMultiManager#dedicatedManagerCluster} mode.
+   * 2. {@link GobblinHelixMultiManager#dedicatedJobClusterController} is 
turned on.
+   * Typically used for unit test and local deployment.
+   */
+  private Optional<HelixManager> jobClusterController = Optional.empty();
+
+  /**
+   * Separate manager cluster and job distribution cluster iff this flag is 
turned on. Otherwise {@link GobblinHelixMultiManager#jobClusterHelixManager}
+   * is same as {@link GobblinHelixMultiManager#managerClusterHelixManager}.
+   */
+  private boolean dedicatedManagerCluster = false;
+
+  /**
+   * Create a dedicated controller for job distribution.
+   */
+  private boolean dedicatedJobClusterController = true;
+
+  @Getter
+  boolean isLeader = false;
+  boolean isStandaloneMode = false;
+  private GobblinClusterManager.StopStatus stopStatus;
+  private Config config;
+  private EventBus eventBus;
+  private final MetricContext metricContext;
+  private final HelixManagerMetrics metrics;
+  private final MessageHandlerFactory userDefinedMessageHandlerFactory;
+  private List<LeadershipChangeAwareComponent> leadershipChangeAwareComponents 
= Lists.newArrayList();
+
+  public GobblinHelixMultiManager(
+      Config config,
+      Function<Void, MessageHandlerFactory> messageHandlerFactoryFunction,
+      EventBus eventBus,
+      GobblinClusterManager.StopStatus stopStatus) {
+    this.config = config;
+    this.eventBus = eventBus;
+    this.stopStatus = stopStatus;
+    this.isStandaloneMode = ConfigUtils.getBoolean(config, 
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
+        GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
+    this.metrics = new HelixManagerMetrics(this.metricContext, this.config);
+    this.dedicatedManagerCluster = ConfigUtils.getBoolean(config,
+       
GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,false);
+    this.userDefinedMessageHandlerFactory = 
messageHandlerFactoryFunction.apply(null);
+    initialize();
+  }
+
+  protected void addLeadershipChangeAwareComponent 
(LeadershipChangeAwareComponent component) {
+    this.leadershipChangeAwareComponents.add(component);
+  }
+
+  /**
+   * Build the {@link HelixManager} for the Application Master.
+   */
+  protected static HelixManager buildHelixManager(Config config, String 
zkConnectionString, String clusterName, InstanceType type) {
+    String helixInstanceName = ConfigUtils.getString(config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+        GobblinClusterManager.class.getSimpleName());
+    return HelixManagerFactory.getZKHelixManager(
+        config.getString(clusterName), helixInstanceName, type, 
zkConnectionString);
+  }
+
+  public void initialize() {
+    
Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+    String zkConnectionString = 
this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    log.info("Using ZooKeeper connection string: " + zkConnectionString);
+
+    if (this.dedicatedManagerCluster) {
+      
Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY));
+      log.info("We will use separate clusters to manage GobblinClusterManager 
and job distribution.");
+      // This will create and register a Helix controller in ZooKeeper
+      this.managerClusterHelixManager = buildHelixManager(this.config,
+          zkConnectionString,
+          GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY,
+          InstanceType.CONTROLLER);
+
+      // This will create a Helix administrator to dispatch jobs to ZooKeeper
+      this.jobClusterHelixManager = buildHelixManager(this.config,
+          zkConnectionString,
+          GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
+          InstanceType.ADMINISTRATOR);
+
+      // This will create a dedicated controller for job distribution
+      this.dedicatedJobClusterController = ConfigUtils.getBoolean(
+          this.config,
+          
GobblinClusterConfigurationKeys.DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED,
+          true);
+
+      if (this.dedicatedJobClusterController) {
+        this.jobClusterController = Optional.of(GobblinHelixMultiManager
+            .buildHelixManager(this.config, zkConnectionString, 
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
+                InstanceType.CONTROLLER));
+      }
+    } else {
+      log.info("We will use same cluster to manage GobblinClusterManager and 
job distribution.");
+      // This will create and register a Helix controller in ZooKeeper
+      this.managerClusterHelixManager = buildHelixManager(this.config,
+          zkConnectionString,
+          GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
+          InstanceType.CONTROLLER);
+      this.jobClusterHelixManager = this.managerClusterHelixManager;
+    }
+  }
+
+  @VisibleForTesting
+  protected void connect() {
+    try {
+      this.isLeader = false;
+      this.managerClusterHelixManager.connect();
+      if (this.dedicatedManagerCluster) {
+        if (jobClusterController.isPresent()) {
+          this.jobClusterController.get().connect();
+        }
+        this.jobClusterHelixManager.connect();
+      }
+
+      this.jobClusterHelixManager.addLiveInstanceChangeListener(new 
GobblinLiveInstanceChangeListener());
+      this.jobClusterHelixManager.getMessagingService()
+          
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
userDefinedMessageHandlerFactory);
+
+      this.jobClusterHelixManager.getMessagingService()
+          
.registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, new 
ControllerShutdownMessageHandlerFactory());
+      // standalone mode listens for controller change
+      if (this.isStandaloneMode) {
+        // Subscribe to leadership changes
+        this.managerClusterHelixManager.addControllerListener(new 
ControllerChangeListener() {
+          @Override
+          public void onControllerChange(NotificationContext changeContext) {
+            handleLeadershipChange(changeContext);
+          }
+        });
+      }
+    } catch (Exception e) {
+      log.error("HelixManager failed to connect", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  protected boolean isConnected() {
+    return managerClusterHelixManager.isConnected() && 
jobClusterHelixManager.isConnected();
+  }
+
+  protected void disconnect() {
+    if (managerClusterHelixManager.isConnected()) {
+      this.managerClusterHelixManager.disconnect();
+    }
+
+    if (this.dedicatedManagerCluster) {
+      if (jobClusterHelixManager.isConnected()) {
+        this.jobClusterHelixManager.disconnect();
+      }
+
+      if (jobClusterController.isPresent() && 
jobClusterController.get().isConnected()) {
+        this.jobClusterController.get().disconnect();
+      }
+
+    }
+  }
+
+  /**
+   * A custom implementation of {@link LiveInstanceChangeListener}.
+   */
+  private static class GobblinLiveInstanceChangeListener implements 
LiveInstanceChangeListener {
+
+    @Override
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances, 
NotificationContext changeContext) {
+      for (LiveInstance liveInstance : liveInstances) {
+        log.info("Live Helix participant instance: " + 
liveInstance.getInstanceName());
+      }
+    }
+  }
+
+  /**
+   * Handle leadership change.
+   * The applicationLauncher is only started on the leader.
+   * The leader cleans up existing jobs before starting the 
applicationLauncher.
+   * @param changeContext notification context
+   */
+  @VisibleForTesting
+  void handleLeadershipChange(NotificationContext changeContext) {
+    this.metrics.clusterLeadershipChange.update(1);
+    if (this.managerClusterHelixManager.isLeader()) {
+      // can get multiple notifications on a leadership change, so only start 
the application launcher the first time
+      // the notification is received
+      log.info("Leader notification for {} isLeader {} HM.isLeader {}",
+          managerClusterHelixManager.getInstanceName(),
+          isLeader,
+          managerClusterHelixManager.isLeader());
+
+      if (!isLeader) {
+        log.info("New Helix Controller leader {}", 
this.managerClusterHelixManager.getInstanceName());
+
+        // Clean up existing jobs
+        TaskDriver taskDriver = new TaskDriver(this.jobClusterHelixManager);
+        Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
+
+        for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) {
+          String queueName = entry.getKey();
+          WorkflowConfig workflowConfig = entry.getValue();
+
+          // request delete if not already requested
+          if (workflowConfig.getTargetState() != TargetState.DELETE) {
+            taskDriver.delete(queueName);
+
+            log.info("Requested delete of queue {}", queueName);
+          }
+        }
+
+        for (LeadershipChangeAwareComponent c: 
this.leadershipChangeAwareComponents) {
+          c.becomeActive();
+        }
+
+        isLeader = true;
+      }
+    } else {
+      // stop and reinitialize services since they are not restartable
+      // this prepares them to start when this cluster manager becomes a leader
+      if (isLeader) {
+        isLeader = false;
+        for (LeadershipChangeAwareComponent c: 
this.leadershipChangeAwareComponents) {
+          c.becomeStandby();
+        }
+      }
+    }
+  }
+
+
+  /**
+   * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that 
handle messages of type
+   * "SHUTDOWN" for shutting down the controller.
+   */
+  private class ControllerShutdownMessageHandlerFactory implements 
MessageHandlerFactory {
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext 
context) {
+      return new ControllerShutdownMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      return GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE;
+    }
+
+    public List<String> getMessageTypes() {
+      return Collections.singletonList(getMessageType());
+    }
+
+    @Override
+    public void reset() {
+
+    }
+
+    /**
+     * A custom {@link MessageHandler} for handling messages of sub type
+     * {@link HelixMessageSubTypes#APPLICATION_MASTER_SHUTDOWN}.
+     */
+    private class ControllerShutdownMessageHandler extends MessageHandler {
+
+      public ControllerShutdownMessageHandler(Message message, 
NotificationContext context) {
+        super(message, context);
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        String messageSubType = this._message.getMsgSubType();
+        Preconditions.checkArgument(
+            
messageSubType.equalsIgnoreCase(HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()),
+            String.format("Unknown %s message subtype: %s", 
GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType));
+
+        HelixTaskResult result = new HelixTaskResult();
+
+        if (stopStatus.isStopInProgress()) {
+          result.setSuccess(true);
+          return result;
+        }
+
+        log.info("Handling message " + 
HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString());
+
+        ScheduledExecutorService shutdownMessageHandlingCompletionWatcher =
+            MoreExecutors.getExitingScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1));
+
+        // Schedule the task for watching on the removal of the shutdown 
message, which indicates that
+        // the message has been successfully processed and it's safe to 
disconnect the HelixManager.
+        // This is a hacky way of watching for the completion of processing 
the shutdown message and
+        // should be replaced by a fix to 
https://issues.apache.org/jira/browse/HELIX-611.
+        shutdownMessageHandlingCompletionWatcher.scheduleAtFixedRate(new 
Runnable() {
+          @Override
+          public void run() {
+            HelixManager helixManager = _notificationContext.getManager();
+            HelixDataAccessor helixDataAccessor = 
helixManager.getHelixDataAccessor();
+
+            HelixProperty helixProperty = helixDataAccessor
+                .getProperty(_message.getKey(helixDataAccessor.keyBuilder(), 
helixManager.getInstanceName()));
+            // The absence of the shutdown message indicates it has been 
removed
+            if (helixProperty == null) {
+              eventBus.post(new ClusterManagerShutdownRequest());
+            }
+          }
+        }, 0, 1, TimeUnit.SECONDS);
+
+        result.setSuccess(true);
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        log.error(
+            String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
+      }
+    }
+  }
+
+  /**
+   * A custom {@link MessageHandlerFactory} for {@link 
ControllerUserDefinedMessageHandler}s that
+   * handle messages of type {@link 
org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
+   */
+  static class ControllerUserDefinedMessageHandlerFactory implements 
MessageHandlerFactory {
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext 
context) {
+      return new ControllerUserDefinedMessageHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      return Message.MessageType.USER_DEFINE_MSG.toString();
+    }
+
+    public List<String> getMessageTypes() {
+      return Collections.singletonList(getMessageType());
+    }
+
+    @Override
+    public void reset() {
+
+    }
+
+    /**
+     * A custom {@link MessageHandler} for handling user-defined messages to 
the controller.
+     *
+     * <p>
+     *   Currently does not handle any user-defined messages. If this class is 
passed a custom message, it will simply
+     *   print out a warning and return successfully. Sub-classes of {@link 
GobblinClusterManager} should override
+     *   {@link GobblinClusterManager#getUserDefinedMessageHandlerFactory()}.
+     * </p>
+     */
+    private static class ControllerUserDefinedMessageHandler extends 
MessageHandler {
+
+      public ControllerUserDefinedMessageHandler(Message message, 
NotificationContext context) {
+        super(message, context);
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        log.warn(String
+            .format("No handling setup for %s message of subtype: %s", 
Message.MessageType.USER_DEFINE_MSG.toString(),
+                this._message.getMsgSubType()));
+
+        HelixTaskResult helixTaskResult = new HelixTaskResult();
+        helixTaskResult.setSuccess(true);
+        return helixTaskResult;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        log.error(
+            String.format("Failed to handle message with exception %s, error 
code %s, error type %s", e, code, type));
+      }
+    }
+  }
+
+
+  /**
+   * Helix related metrics
+   */
+  private class HelixManagerMetrics extends 
StandardMetricsBridge.StandardMetrics {
+    public static final String CLUSTER_LEADERSHIP_CHANGE = 
"clusterLeadershipChange";
+    private ContextAwareHistogram clusterLeadershipChange;
+    public HelixManagerMetrics(final MetricContext metricContext, final Config 
config) {
+      int timeWindowSizeInMinutes = ConfigUtils.getInt(config, 
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, 
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+      this.clusterLeadershipChange = 
metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 
timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(clusterLeadershipChange);
+    }
+
+    @Override
+    public String getName() {
+      return GobblinClusterManager.class.getName();
+    }
+  }
+
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    return this.metricContext;
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return true;
+  }
+
+  @Override
+  public StandardMetrics getStandardMetrics() {
+    return this.metrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java
new file mode 100644
index 0000000..103e7ff
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.cluster;
+
+public interface LeadershipChangeAwareComponent {
+  void becomeActive();
+  void becomeStandby();
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 3e40363..58a5210 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -73,7 +73,7 @@ public class ClusterIntegrationTest {
   private GobblinTaskRunner _worker;
   private GobblinClusterManager _manager;
   private boolean _runTaskInSeparateProcess;
-
+  private boolean _dedicatedClusterManager = false;
 
   @Test
   public void simpleJobShouldComplete() throws Exception {
@@ -87,6 +87,13 @@ public class ClusterIntegrationTest {
     runSimpleJobAndVerifyResult();
   }
 
+  @Test
+  public void dedicatedManagerClusterMode()
+      throws Exception {
+    _dedicatedClusterManager = true;
+    runSimpleJobAndVerifyResult();
+  }
+
   private void runSimpleJobAndVerifyResult()
       throws Exception {
     init();
@@ -154,6 +161,10 @@ public class ClusterIntegrationTest {
     if (_runTaskInSeparateProcess) {
       
configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, 
"true");
     }
+    if (_dedicatedClusterManager) {
+      
configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,
 "true");
+      configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, 
"ManagerCluster");
+    }
     Config config = ConfigFactory.parseMap(configMap);
     return config;
   }
@@ -175,12 +186,18 @@ public class ClusterIntegrationTest {
     }
   }
 
-  private void createHelixCluster() {
+  private void createHelixCluster() throws Exception {
     String zkConnectionString = _config
         .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
     String helix_cluster_name = _config
         .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
     HelixUtils.createGobblinHelixCluster(zkConnectionString, 
helix_cluster_name);
+
+    if (_dedicatedClusterManager) {
+      String manager_cluster_name = _config
+          .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY);
+      HelixUtils.createGobblinHelixCluster(zkConnectionString, 
manager_cluster_name);
+    }
   }
 
   private void startCluster() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
index 11b8808..3fde3e7 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java
@@ -280,7 +280,7 @@ public class GobblinClusterKillTest {
     // need to reinitialize the heap manager and call handleLeadershipChange 
to shut down services in the test
     // since the leadership change is simulated
     _clusterManagers[0].initializeHelixManager();
-    _clusterManagers[0].handleLeadershipChange(null);
+    _clusterManagers[0].multiManager.handleLeadershipChange(null);
 
     // reconnect to get leadership role
     _clusterManagers[0].connectHelixManager();
@@ -318,7 +318,7 @@ public class GobblinClusterKillTest {
   public void tearDown() throws IOException, InterruptedException {
 
     for (int i = 0; i < NUM_MANAGERS; i++) {
-      _clusterManagers[i].connectHelixManager();
+      _clusterManagers[i].multiManager.connect();
       if (!_clusterManagers[i].isHelixManagerConnected()) {
         _clusterManagers[i].connectHelixManager();
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/test/resources/log4j.xml 
b/gobblin-cluster/src/test/resources/log4j.xml
index 32367af..a5a3d93 100644
--- a/gobblin-cluster/src/test/resources/log4j.xml
+++ b/gobblin-cluster/src/test/resources/log4j.xml
@@ -23,18 +23,18 @@
     <param name="Target" value="System.out" />
     <layout class="org.apache.log4j.PatternLayout">
       <param name="ConversionPattern"
-        value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" />
+             value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" />
     </layout>
   </appender>
 
-  <!--<logger name="org.apache.hadoop.conf.Configuration">-->
-    <!--<level value="FATAL"/>-->
-  <!--</logger>-->
-
   <root>
-    <level value="warn" />
+    <level value="info" />
     <appender-ref ref="console" />
   </root>
 
+  <!-- Swallow annoying exceptions when creating a configuration. -->
+  <logger name="org.apache.hadoop.conf.Configuration">
+    <level value="FATAL"/>
+  </logger>
 
 </log4j:configuration>

Reply via email to