Repository: incubator-gobblin Updated Branches: refs/heads/master 0cbaf5996 -> 15b7cd08f
[GOBBLIN-480] Separate job distribution cluster with gobblin cluster manager cluster Closes #2349 from yukuai518/controller Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/15b7cd08 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/15b7cd08 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/15b7cd08 Branch: refs/heads/master Commit: 15b7cd08f724f1de471cc59f46323b234215b2f9 Parents: 0cbaf59 Author: Kuai Yu <[email protected]> Authored: Tue May 1 14:40:28 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue May 1 14:40:28 2018 -0700 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 3 + .../gobblin/cluster/GobblinClusterManager.java | 377 +++----------- .../cluster/GobblinHelixMultiManager.java | 494 +++++++++++++++++++ .../cluster/LeadershipChangeAwareComponent.java | 24 + .../gobblin/cluster/ClusterIntegrationTest.java | 21 +- .../gobblin/cluster/GobblinClusterKillTest.java | 4 +- gobblin-cluster/src/test/resources/log4j.xml | 12 +- 7 files changed, 618 insertions(+), 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 76382e7..1ad7445 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -49,6 +49,9 @@ public class GobblinClusterConfigurationKeys { // Helix configuration properties. public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name"; + public static final String MANAGER_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "manager.cluster.name"; + public static final String DEDICATED_MANAGER_CLUSTER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "dedicatedManagerCluster.enabled"; + public static final String DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "dedicatedJobClusterController.enabled"; public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_CLUSTER_PREFIX + "zk.connection.string"; public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX + "work.unit.file.path"; public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 1592cf8..b1d0f43 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -20,14 +20,11 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; @@ -38,35 +35,20 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.helix.ControllerChangeListener; import org.apache.helix.Criteria; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; -import org.apache.helix.LiveInstanceChangeListener; -import org.apache.helix.NotificationContext; -import org.apache.helix.messaging.handling.HelixTaskResult; -import org.apache.helix.messaging.handling.MessageHandler; import org.apache.helix.messaging.handling.MessageHandlerFactory; -import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; -import org.apache.helix.task.TargetState; -import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.WorkflowConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -74,13 +56,14 @@ import com.typesafe.config.ConfigValueFactory; import javax.annotation.Nonnull; import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareHistogram; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; @@ -117,13 +100,15 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; * @author Yinan Li */ @Alpha -public class GobblinClusterManager implements ApplicationLauncher, StandardMetricsBridge { +@Slf4j +public class GobblinClusterManager implements ApplicationLauncher, StandardMetricsBridge, LeadershipChangeAwareComponent { private static final Logger LOGGER = LoggerFactory.getLogger(GobblinClusterManager.class); - private HelixManager helixManager; + @VisibleForTesting + protected GobblinHelixMultiManager multiManager; - private volatile boolean stopInProgress = false; + private StopStatus stopStatus = new StopStatus(false); protected ServiceBasedAppLauncher applicationLauncher; @@ -157,13 +142,14 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private final String clusterName; private final Config config; private final MetricContext metricContext; - private final Metrics metrics; + private final StandardMetrics metrics; + public GobblinClusterManager(String clusterName, String applicationId, Config config, Optional<Path> appWorkDirOptional) throws Exception { this.clusterName = clusterName; this.config = config; this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); - this.metrics = new Metrics(this.metricContext, this.config); + this.metrics = new StandardMetrics(); this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE); @@ -198,7 +184,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri this.jobCatalog = (MutableJobCatalog) GobblinConstructorUtils.invokeFirstConstructor(Class.forName(jobCatalogClassName), - ImmutableList.<Object>of(config + ImmutableList.of(config .getConfig(StringUtils.removeEnd(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX, ".")) .withFallback(this.config))); } else { @@ -243,58 +229,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri } /** - * Handle leadership change. - * The applicationLauncher is only started on the leader. - * The leader cleans up existing jobs before starting the applicationLauncher. - * @param changeContext notification context - */ - @VisibleForTesting - void handleLeadershipChange(NotificationContext changeContext) { - this.metrics.clusterLeadershipChange.update(1); - if (this.helixManager.isLeader()) { - // can get multiple notifications on a leadership change, so only start the application launcher the first time - // the notification is received - LOGGER.info("Leader notification for {} isLeader {} HM.isLeader {}", this.helixManager.getInstanceName(), - isLeader, this.helixManager.isLeader()); - - if (!isLeader) { - LOGGER.info("New Helix Controller leader {}", this.helixManager.getInstanceName()); - - // Clean up existing jobs - TaskDriver taskDriver = new TaskDriver(this.helixManager); - Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows(); - - for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) { - String queueName = entry.getKey(); - WorkflowConfig workflowConfig = entry.getValue(); - - // request delete if not already requested - if (workflowConfig.getTargetState() != TargetState.DELETE) { - taskDriver.delete(queueName); - - LOGGER.info("Requested delete of queue {}", queueName); - } - } - - startAppLauncherAndServices(); - isLeader = true; - } - } else { - // stop and reinitialize services since they are not restartable - // this prepares them to start when this cluster manager becomes a leader - if (isLeader) { - isLeader = false; - stopAppLauncherAndServices(); - try { - initializeAppLauncherAndServices(); - } catch (Exception e) { - throw new RuntimeException("Exception reinitializing app launcher services ", e); - } - } - } - } - - /** * Start the Gobblin Cluster Manager. */ @Override @@ -302,14 +236,14 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri LOGGER.info("Starting the Gobblin Cluster Manager"); this.eventBus.register(this); - connectHelixManager(); + this.multiManager.connect(); if (this.isStandaloneMode) { // standalone mode starts non-daemon threads later, so need to have this thread to keep process up this.idleProcessThread = new Thread(new Runnable() { @Override public void run() { - while (!GobblinClusterManager.this.stopInProgress && !GobblinClusterManager.this.stopIdleProcessThread) { + while (!GobblinClusterManager.this.stopStatus.isStopInProgress() && !GobblinClusterManager.this.stopIdleProcessThread) { try { Thread.sleep(300); } catch (InterruptedException e) { @@ -340,11 +274,11 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri */ @Override public synchronized void stop() { - if (this.stopInProgress) { + if (this.stopStatus.isStopInProgress()) { return; } - this.stopInProgress = true; + this.stopStatus.setStopInprogress(true); LOGGER.info("Stopping the Gobblin Cluster Manager"); @@ -364,7 +298,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri stopAppLauncherAndServices(); - disconnectHelixManager(); + this.multiManager.disconnect(); } /** @@ -377,17 +311,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri } /** - * Build the {@link HelixManager} for the Application Master. - */ - private HelixManager buildHelixManager(Config config, String zkConnectionString) { - String helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, - GobblinClusterManager.class.getSimpleName()); - return HelixManagerFactory.getZKHelixManager( - config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), helixInstanceName, - InstanceType.CONTROLLER, zkConnectionString); - } - - /** * Build the {@link FileSystem} for the Application Master. */ private FileSystem buildFileSystem(Config config) throws IOException { @@ -402,7 +325,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config config, Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService) throws Exception { Properties properties = ConfigUtils.configToProperties(config); - return new GobblinHelixJobScheduler(properties, this.helixManager, this.eventBus, appWorkDir, metadataTags, + return new GobblinHelixJobScheduler(properties, this.multiManager.getJobClusterHelixManager(), this.eventBus, appWorkDir, metadataTags, schedulerService, this.jobCatalog); } @@ -440,33 +363,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri return this.eventBus; } - @VisibleForTesting - void connectHelixManager() { - try { - this.isLeader = false; - this.helixManager.connect(); - this.helixManager.addLiveInstanceChangeListener(new GobblinLiveInstanceChangeListener()); - this.helixManager.getMessagingService() - .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, new ControllerShutdownMessageHandlerFactory()); - this.helixManager.getMessagingService() - .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), - getUserDefinedMessageHandlerFactory()); - - // standalone mode listens for controller change - if (this.isStandaloneMode) { - // Subscribe to leadership changes - this.helixManager.addControllerListener(new ControllerChangeListener() { - @Override - public void onControllerChange(NotificationContext changeContext) { - handleLeadershipChange(changeContext); - } - }); - } - } catch (Exception e) { - LOGGER.error("HelixManager failed to connect", e); - throw Throwables.propagate(e); - } - } /** * Creates and returns a {@link MessageHandlerFactory} for handling of Helix @@ -475,28 +371,38 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri * @returns a {@link MessageHandlerFactory}. */ protected MessageHandlerFactory getUserDefinedMessageHandlerFactory() { - return new ControllerUserDefinedMessageHandlerFactory(); + return new GobblinHelixMultiManager.ControllerUserDefinedMessageHandlerFactory(); + } + + @VisibleForTesting + void connectHelixManager() { + this.multiManager.connect(); } @VisibleForTesting void disconnectHelixManager() { - if (isHelixManagerConnected()) { - this.helixManager.disconnect(); - } + this.multiManager.disconnect(); } @VisibleForTesting boolean isHelixManagerConnected() { - return this.helixManager.isConnected(); + return this.multiManager.isConnected(); } + /** + * In separate controller mode, one controller will manage manager's HA, the other will handle the job dispatching and + * work unit assignment. + */ @VisibleForTesting void initializeHelixManager() { - String zkConnectionString = this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); - LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString); - - // This will create and register a Helix controller in ZooKeeper - this.helixManager = buildHelixManager(this.config, zkConnectionString); + this.multiManager = new GobblinHelixMultiManager( + this.config, new Function<Void, MessageHandlerFactory>() { + @Override + public MessageHandlerFactory apply(Void aVoid) { + return GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(); + } + }, this.eventBus, stopStatus) ; + this.multiManager.addLeadershipChangeAwareComponent(this); } @VisibleForTesting @@ -525,7 +431,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri // for messaging to instances //int messagesSent = this.helixManager.getMessagingService().send(criteria, shutdownRequest, // new NoopReplyHandler(), timeout); - GobblinHelixMessagingService messagingService = new GobblinHelixMessagingService(this.helixManager); + GobblinHelixMessagingService messagingService = new GobblinHelixMessagingService(this.multiManager.getJobClusterHelixManager()); int messagesSent = messagingService.send(criteria, shutdownRequest, new NoopReplyHandler(), timeout); @@ -556,179 +462,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri } /** - * A custom implementation of {@link LiveInstanceChangeListener}. - */ - private static class GobblinLiveInstanceChangeListener implements LiveInstanceChangeListener { - - @Override - public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { - for (LiveInstance liveInstance : liveInstances) { - LOGGER.info("Live Helix participant instance: " + liveInstance.getInstanceName()); - } - } - } - - private class Metrics extends StandardMetrics { - public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange"; - private ContextAwareHistogram clusterLeadershipChange; - public Metrics(final MetricContext metricContext, final Config config) { - int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); - this.clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES); - this.contextAwareMetrics.add(clusterLeadershipChange); - } - - @Override - public String getName() { - return GobblinClusterManager.class.getName(); - } - } - - /** - * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that handle messages of type - * "SHUTDOWN" for shutting down the controller. - */ - private class ControllerShutdownMessageHandlerFactory implements MessageHandlerFactory { - - @Override - public MessageHandler createHandler(Message message, NotificationContext context) { - return new ControllerShutdownMessageHandler(message, context); - } - - @Override - public String getMessageType() { - return GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE; - } - - public List<String> getMessageTypes() { - return Collections.singletonList(getMessageType()); - } - - @Override - public void reset() { - - } - - /** - * A custom {@link MessageHandler} for handling messages of sub type - * {@link HelixMessageSubTypes#APPLICATION_MASTER_SHUTDOWN}. - */ - private class ControllerShutdownMessageHandler extends MessageHandler { - - public ControllerShutdownMessageHandler(Message message, NotificationContext context) { - super(message, context); - } - - @Override - public HelixTaskResult handleMessage() throws InterruptedException { - String messageSubType = this._message.getMsgSubType(); - Preconditions.checkArgument( - messageSubType.equalsIgnoreCase(HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()), - String.format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType)); - - HelixTaskResult result = new HelixTaskResult(); - - if (stopInProgress) { - result.setSuccess(true); - return result; - } - - LOGGER.info("Handling message " + HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()); - - ScheduledExecutorService shutdownMessageHandlingCompletionWatcher = - MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); - - // Schedule the task for watching on the removal of the shutdown message, which indicates that - // the message has been successfully processed and it's safe to disconnect the HelixManager. - // This is a hacky way of watching for the completion of processing the shutdown message and - // should be replaced by a fix to https://issues.apache.org/jira/browse/HELIX-611. - shutdownMessageHandlingCompletionWatcher.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - HelixManager helixManager = _notificationContext.getManager(); - HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); - - HelixProperty helixProperty = helixDataAccessor - .getProperty(_message.getKey(helixDataAccessor.keyBuilder(), helixManager.getInstanceName())); - // The absence of the shutdown message indicates it has been removed - if (helixProperty == null) { - eventBus.post(new ClusterManagerShutdownRequest()); - } - } - }, 0, 1, TimeUnit.SECONDS); - - result.setSuccess(true); - return result; - } - - @Override - public void onError(Exception e, ErrorCode code, ErrorType type) { - LOGGER.error( - String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); - } - } - } - - /** - * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that - * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}. - */ - private static class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory { - - @Override - public MessageHandler createHandler(Message message, NotificationContext context) { - return new ControllerUserDefinedMessageHandler(message, context); - } - - @Override - public String getMessageType() { - return Message.MessageType.USER_DEFINE_MSG.toString(); - } - - public List<String> getMessageTypes() { - return Collections.singletonList(getMessageType()); - } - - @Override - public void reset() { - - } - - /** - * A custom {@link MessageHandler} for handling user-defined messages to the controller. - * - * <p> - * Currently does not handle any user-defined messages. If this class is passed a custom message, it will simply - * print out a warning and return successfully. Sub-classes of {@link GobblinClusterManager} should override - * {@link #getUserDefinedMessageHandlerFactory}. - * </p> - */ - private static class ControllerUserDefinedMessageHandler extends MessageHandler { - - public ControllerUserDefinedMessageHandler(Message message, NotificationContext context) { - super(message, context); - } - - @Override - public HelixTaskResult handleMessage() throws InterruptedException { - LOGGER.warn(String - .format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(), - this._message.getMsgSubType())); - - HelixTaskResult helixTaskResult = new HelixTaskResult(); - helixTaskResult.setSuccess(true); - return helixTaskResult; - } - - @Override - public void onError(Exception e, ErrorCode code, ErrorType type) { - LOGGER.error( - String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); - } - } - } - - - /** * TODO for now the cluster id is hardcoded to 1 both here and in the {@link GobblinTaskRunner}. In the future, the * cluster id should be created by the {@link GobblinClusterManager} and passed to each {@link GobblinTaskRunner} via * Helix (at least that would be the easiest approach, there are certainly others ways to do it). @@ -799,4 +532,34 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri System.exit(1); } } + + @Override + public void becomeActive() { + startAppLauncherAndServices(); + } + + @Override + public void becomeStandby() { + stopAppLauncherAndServices(); + try { + initializeAppLauncherAndServices(); + } catch (Exception e) { + throw new RuntimeException("Exception reinitializing app launcher services ", e); + } + } + + static class StopStatus { + @Getter + @Setter + AtomicBoolean isStopInProgress; + public StopStatus(boolean inProgress) { + isStopInProgress = new AtomicBoolean(inProgress); + } + public void setStopInprogress (boolean inProgress) { + isStopInProgress.set(inProgress); + } + public boolean isStopInProgress () { + return isStopInProgress.get(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java new file mode 100644 index 0000000..f659978 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.cluster; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.helix.ControllerChangeListener; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.HelixProperty; +import org.apache.helix.InstanceType; +import org.apache.helix.LiveInstanceChangeListener; +import org.apache.helix.NotificationContext; +import org.apache.helix.messaging.handling.HelixTaskResult; +import org.apache.helix.messaging.handling.MessageHandler; +import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.task.TargetState; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.WorkflowConfig; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.eventbus.EventBus; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; + +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.util.ConfigUtils; + +/** + * Encapsulate all Helix related components: controller, participants, etc. + * Provide all kinds of callbacks, listeners, message handlers that each Helix components need to register. + */ +@Slf4j +public class GobblinHelixMultiManager implements StandardMetricsBridge { + + /** + * Helix manager to handle cluster manager leader election. + * Corresponds to cluster with key name {@link GobblinClusterConfigurationKeys#MANAGER_CLUSTER_NAME_KEY} iff dedicatedManagerCluster is true. + * Corresponds to cluster with key name {@link GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY} iff dedicatedManagerCluster is false. + */ + @Getter + private HelixManager managerClusterHelixManager = null; + + /** + * Helix manager to handle job distribution. + * Corresponds to cluster with key name {@link GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY}. + */ + @Getter + private HelixManager jobClusterHelixManager = null; + + /** + * Helix controller for job distribution. Effective only iff below two conditions are established: + * 1. In {@link GobblinHelixMultiManager#dedicatedManagerCluster} mode. + * 2. {@link GobblinHelixMultiManager#dedicatedJobClusterController} is turned on. + * Typically used for unit test and local deployment. + */ + private Optional<HelixManager> jobClusterController = Optional.empty(); + + /** + * Separate manager cluster and job distribution cluster iff this flag is turned on. Otherwise {@link GobblinHelixMultiManager#jobClusterHelixManager} + * is same as {@link GobblinHelixMultiManager#managerClusterHelixManager}. + */ + private boolean dedicatedManagerCluster = false; + + /** + * Create a dedicated controller for job distribution. + */ + private boolean dedicatedJobClusterController = true; + + @Getter + boolean isLeader = false; + boolean isStandaloneMode = false; + private GobblinClusterManager.StopStatus stopStatus; + private Config config; + private EventBus eventBus; + private final MetricContext metricContext; + private final HelixManagerMetrics metrics; + private final MessageHandlerFactory userDefinedMessageHandlerFactory; + private List<LeadershipChangeAwareComponent> leadershipChangeAwareComponents = Lists.newArrayList(); + + public GobblinHelixMultiManager( + Config config, + Function<Void, MessageHandlerFactory> messageHandlerFactoryFunction, + EventBus eventBus, + GobblinClusterManager.StopStatus stopStatus) { + this.config = config; + this.eventBus = eventBus; + this.stopStatus = stopStatus; + this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, + GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE); + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); + this.metrics = new HelixManagerMetrics(this.metricContext, this.config); + this.dedicatedManagerCluster = ConfigUtils.getBoolean(config, + GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,false); + this.userDefinedMessageHandlerFactory = messageHandlerFactoryFunction.apply(null); + initialize(); + } + + protected void addLeadershipChangeAwareComponent (LeadershipChangeAwareComponent component) { + this.leadershipChangeAwareComponents.add(component); + } + + /** + * Build the {@link HelixManager} for the Application Master. + */ + protected static HelixManager buildHelixManager(Config config, String zkConnectionString, String clusterName, InstanceType type) { + String helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, + GobblinClusterManager.class.getSimpleName()); + return HelixManagerFactory.getZKHelixManager( + config.getString(clusterName), helixInstanceName, type, zkConnectionString); + } + + public void initialize() { + Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY)); + String zkConnectionString = this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + log.info("Using ZooKeeper connection string: " + zkConnectionString); + + if (this.dedicatedManagerCluster) { + Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY)); + log.info("We will use separate clusters to manage GobblinClusterManager and job distribution."); + // This will create and register a Helix controller in ZooKeeper + this.managerClusterHelixManager = buildHelixManager(this.config, + zkConnectionString, + GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, + InstanceType.CONTROLLER); + + // This will create a Helix administrator to dispatch jobs to ZooKeeper + this.jobClusterHelixManager = buildHelixManager(this.config, + zkConnectionString, + GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, + InstanceType.ADMINISTRATOR); + + // This will create a dedicated controller for job distribution + this.dedicatedJobClusterController = ConfigUtils.getBoolean( + this.config, + GobblinClusterConfigurationKeys.DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED, + true); + + if (this.dedicatedJobClusterController) { + this.jobClusterController = Optional.of(GobblinHelixMultiManager + .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, + InstanceType.CONTROLLER)); + } + } else { + log.info("We will use same cluster to manage GobblinClusterManager and job distribution."); + // This will create and register a Helix controller in ZooKeeper + this.managerClusterHelixManager = buildHelixManager(this.config, + zkConnectionString, + GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, + InstanceType.CONTROLLER); + this.jobClusterHelixManager = this.managerClusterHelixManager; + } + } + + @VisibleForTesting + protected void connect() { + try { + this.isLeader = false; + this.managerClusterHelixManager.connect(); + if (this.dedicatedManagerCluster) { + if (jobClusterController.isPresent()) { + this.jobClusterController.get().connect(); + } + this.jobClusterHelixManager.connect(); + } + + this.jobClusterHelixManager.addLiveInstanceChangeListener(new GobblinLiveInstanceChangeListener()); + this.jobClusterHelixManager.getMessagingService() + .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), userDefinedMessageHandlerFactory); + + this.jobClusterHelixManager.getMessagingService() + .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, new ControllerShutdownMessageHandlerFactory()); + // standalone mode listens for controller change + if (this.isStandaloneMode) { + // Subscribe to leadership changes + this.managerClusterHelixManager.addControllerListener(new ControllerChangeListener() { + @Override + public void onControllerChange(NotificationContext changeContext) { + handleLeadershipChange(changeContext); + } + }); + } + } catch (Exception e) { + log.error("HelixManager failed to connect", e); + throw Throwables.propagate(e); + } + } + + protected boolean isConnected() { + return managerClusterHelixManager.isConnected() && jobClusterHelixManager.isConnected(); + } + + protected void disconnect() { + if (managerClusterHelixManager.isConnected()) { + this.managerClusterHelixManager.disconnect(); + } + + if (this.dedicatedManagerCluster) { + if (jobClusterHelixManager.isConnected()) { + this.jobClusterHelixManager.disconnect(); + } + + if (jobClusterController.isPresent() && jobClusterController.get().isConnected()) { + this.jobClusterController.get().disconnect(); + } + + } + } + + /** + * A custom implementation of {@link LiveInstanceChangeListener}. + */ + private static class GobblinLiveInstanceChangeListener implements LiveInstanceChangeListener { + + @Override + public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { + for (LiveInstance liveInstance : liveInstances) { + log.info("Live Helix participant instance: " + liveInstance.getInstanceName()); + } + } + } + + /** + * Handle leadership change. + * The applicationLauncher is only started on the leader. + * The leader cleans up existing jobs before starting the applicationLauncher. + * @param changeContext notification context + */ + @VisibleForTesting + void handleLeadershipChange(NotificationContext changeContext) { + this.metrics.clusterLeadershipChange.update(1); + if (this.managerClusterHelixManager.isLeader()) { + // can get multiple notifications on a leadership change, so only start the application launcher the first time + // the notification is received + log.info("Leader notification for {} isLeader {} HM.isLeader {}", + managerClusterHelixManager.getInstanceName(), + isLeader, + managerClusterHelixManager.isLeader()); + + if (!isLeader) { + log.info("New Helix Controller leader {}", this.managerClusterHelixManager.getInstanceName()); + + // Clean up existing jobs + TaskDriver taskDriver = new TaskDriver(this.jobClusterHelixManager); + Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows(); + + for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) { + String queueName = entry.getKey(); + WorkflowConfig workflowConfig = entry.getValue(); + + // request delete if not already requested + if (workflowConfig.getTargetState() != TargetState.DELETE) { + taskDriver.delete(queueName); + + log.info("Requested delete of queue {}", queueName); + } + } + + for (LeadershipChangeAwareComponent c: this.leadershipChangeAwareComponents) { + c.becomeActive(); + } + + isLeader = true; + } + } else { + // stop and reinitialize services since they are not restartable + // this prepares them to start when this cluster manager becomes a leader + if (isLeader) { + isLeader = false; + for (LeadershipChangeAwareComponent c: this.leadershipChangeAwareComponents) { + c.becomeStandby(); + } + } + } + } + + + /** + * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that handle messages of type + * "SHUTDOWN" for shutting down the controller. + */ + private class ControllerShutdownMessageHandlerFactory implements MessageHandlerFactory { + + @Override + public MessageHandler createHandler(Message message, NotificationContext context) { + return new ControllerShutdownMessageHandler(message, context); + } + + @Override + public String getMessageType() { + return GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE; + } + + public List<String> getMessageTypes() { + return Collections.singletonList(getMessageType()); + } + + @Override + public void reset() { + + } + + /** + * A custom {@link MessageHandler} for handling messages of sub type + * {@link HelixMessageSubTypes#APPLICATION_MASTER_SHUTDOWN}. + */ + private class ControllerShutdownMessageHandler extends MessageHandler { + + public ControllerShutdownMessageHandler(Message message, NotificationContext context) { + super(message, context); + } + + @Override + public HelixTaskResult handleMessage() throws InterruptedException { + String messageSubType = this._message.getMsgSubType(); + Preconditions.checkArgument( + messageSubType.equalsIgnoreCase(HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()), + String.format("Unknown %s message subtype: %s", GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, messageSubType)); + + HelixTaskResult result = new HelixTaskResult(); + + if (stopStatus.isStopInProgress()) { + result.setSuccess(true); + return result; + } + + log.info("Handling message " + HelixMessageSubTypes.APPLICATION_MASTER_SHUTDOWN.toString()); + + ScheduledExecutorService shutdownMessageHandlingCompletionWatcher = + MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); + + // Schedule the task for watching on the removal of the shutdown message, which indicates that + // the message has been successfully processed and it's safe to disconnect the HelixManager. + // This is a hacky way of watching for the completion of processing the shutdown message and + // should be replaced by a fix to https://issues.apache.org/jira/browse/HELIX-611. + shutdownMessageHandlingCompletionWatcher.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + HelixManager helixManager = _notificationContext.getManager(); + HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); + + HelixProperty helixProperty = helixDataAccessor + .getProperty(_message.getKey(helixDataAccessor.keyBuilder(), helixManager.getInstanceName())); + // The absence of the shutdown message indicates it has been removed + if (helixProperty == null) { + eventBus.post(new ClusterManagerShutdownRequest()); + } + } + }, 0, 1, TimeUnit.SECONDS); + + result.setSuccess(true); + return result; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + log.error( + String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); + } + } + } + + /** + * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that + * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}. + */ + static class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory { + + @Override + public MessageHandler createHandler(Message message, NotificationContext context) { + return new ControllerUserDefinedMessageHandler(message, context); + } + + @Override + public String getMessageType() { + return Message.MessageType.USER_DEFINE_MSG.toString(); + } + + public List<String> getMessageTypes() { + return Collections.singletonList(getMessageType()); + } + + @Override + public void reset() { + + } + + /** + * A custom {@link MessageHandler} for handling user-defined messages to the controller. + * + * <p> + * Currently does not handle any user-defined messages. If this class is passed a custom message, it will simply + * print out a warning and return successfully. Sub-classes of {@link GobblinClusterManager} should override + * {@link GobblinClusterManager#getUserDefinedMessageHandlerFactory()}. + * </p> + */ + private static class ControllerUserDefinedMessageHandler extends MessageHandler { + + public ControllerUserDefinedMessageHandler(Message message, NotificationContext context) { + super(message, context); + } + + @Override + public HelixTaskResult handleMessage() throws InterruptedException { + log.warn(String + .format("No handling setup for %s message of subtype: %s", Message.MessageType.USER_DEFINE_MSG.toString(), + this._message.getMsgSubType())); + + HelixTaskResult helixTaskResult = new HelixTaskResult(); + helixTaskResult.setSuccess(true); + return helixTaskResult; + } + + @Override + public void onError(Exception e, ErrorCode code, ErrorType type) { + log.error( + String.format("Failed to handle message with exception %s, error code %s, error type %s", e, code, type)); + } + } + } + + + /** + * Helix related metrics + */ + private class HelixManagerMetrics extends StandardMetricsBridge.StandardMetrics { + public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange"; + private ContextAwareHistogram clusterLeadershipChange; + public HelixManagerMetrics(final MetricContext metricContext, final Config config) { + int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); + this.clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES); + this.contextAwareMetrics.add(clusterLeadershipChange); + } + + @Override + public String getName() { + return GobblinClusterManager.class.getName(); + } + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + return this.metricContext; + } + + @Override + public boolean isInstrumentationEnabled() { + return true; + } + + @Override + public StandardMetrics getStandardMetrics() { + return this.metrics; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java new file mode 100644 index 0000000..103e7ff --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/LeadershipChangeAwareComponent.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.cluster; + +public interface LeadershipChangeAwareComponent { + void becomeActive(); + void becomeStandby(); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java index 3e40363..58a5210 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -73,7 +73,7 @@ public class ClusterIntegrationTest { private GobblinTaskRunner _worker; private GobblinClusterManager _manager; private boolean _runTaskInSeparateProcess; - + private boolean _dedicatedClusterManager = false; @Test public void simpleJobShouldComplete() throws Exception { @@ -87,6 +87,13 @@ public class ClusterIntegrationTest { runSimpleJobAndVerifyResult(); } + @Test + public void dedicatedManagerClusterMode() + throws Exception { + _dedicatedClusterManager = true; + runSimpleJobAndVerifyResult(); + } + private void runSimpleJobAndVerifyResult() throws Exception { init(); @@ -154,6 +161,10 @@ public class ClusterIntegrationTest { if (_runTaskInSeparateProcess) { configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, "true"); } + if (_dedicatedClusterManager) { + configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED, "true"); + configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, "ManagerCluster"); + } Config config = ConfigFactory.parseMap(configMap); return config; } @@ -175,12 +186,18 @@ public class ClusterIntegrationTest { } } - private void createHelixCluster() { + private void createHelixCluster() throws Exception { String zkConnectionString = _config .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); String helix_cluster_name = _config .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); HelixUtils.createGobblinHelixCluster(zkConnectionString, helix_cluster_name); + + if (_dedicatedClusterManager) { + String manager_cluster_name = _config + .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY); + HelixUtils.createGobblinHelixCluster(zkConnectionString, manager_cluster_name); + } } private void startCluster() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java index 11b8808..3fde3e7 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java @@ -280,7 +280,7 @@ public class GobblinClusterKillTest { // need to reinitialize the heap manager and call handleLeadershipChange to shut down services in the test // since the leadership change is simulated _clusterManagers[0].initializeHelixManager(); - _clusterManagers[0].handleLeadershipChange(null); + _clusterManagers[0].multiManager.handleLeadershipChange(null); // reconnect to get leadership role _clusterManagers[0].connectHelixManager(); @@ -318,7 +318,7 @@ public class GobblinClusterKillTest { public void tearDown() throws IOException, InterruptedException { for (int i = 0; i < NUM_MANAGERS; i++) { - _clusterManagers[i].connectHelixManager(); + _clusterManagers[i].multiManager.connect(); if (!_clusterManagers[i].isHelixManagerConnected()) { _clusterManagers[i].connectHelixManager(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/15b7cd08/gobblin-cluster/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/log4j.xml b/gobblin-cluster/src/test/resources/log4j.xml index 32367af..a5a3d93 100644 --- a/gobblin-cluster/src/test/resources/log4j.xml +++ b/gobblin-cluster/src/test/resources/log4j.xml @@ -23,18 +23,18 @@ <param name="Target" value="System.out" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" - value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" /> + value="%d{yyyy-MM-dd HH:mm:ss z} %-5p [%t] %C %L - %m%n" /> </layout> </appender> - <!--<logger name="org.apache.hadoop.conf.Configuration">--> - <!--<level value="FATAL"/>--> - <!--</logger>--> - <root> - <level value="warn" /> + <level value="info" /> <appender-ref ref="console" /> </root> + <!-- Swallow annoying exceptions when creating a configuration. --> + <logger name="org.apache.hadoop.conf.Configuration"> + <level value="FATAL"/> + </logger> </log4j:configuration>
