This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new cf6af89 [GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus… cf6af89 is described below commit cf6af89995369e80afbd661ef0d7d2852d1ca4c9 Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Fri Mar 27 21:26:00 2020 -0700 [GOBBLIN-1099] Handle orphaned Yarn containers in Gobblin-on-Yarn clus… Closes #2940 from sv2000/yarnOrphans --- .../gobblin/cluster/GobblinClusterManager.java | 8 +- .../apache/gobblin/cluster/GobblinTaskRunner.java | 93 ++++++++++++++++++---- .../org/apache/gobblin/cluster/HelixUtils.java | 32 ++++++++ .../gobblin/cluster/GobblinTaskRunnerTest.java | 47 ++++++++++- .../gobblin/yarn/GobblinApplicationMaster.java | 3 +- .../gobblin/yarn/GobblinYarnAppLauncher.java | 43 +++++++--- .../java/org/apache/gobblin/yarn/YarnService.java | 45 ++++++++--- .../gobblin/yarn/GobblinYarnAppLauncherTest.java | 2 +- .../org/apache/gobblin/yarn/YarnServiceTest.java | 20 ++--- 9 files changed, 237 insertions(+), 56 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 8eabad1..2c3f49f 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -56,6 +56,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -108,6 +109,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri protected ServiceBasedAppLauncher applicationLauncher; // An EventBus used for communications between services running in the ApplicationMaster + @Getter(AccessLevel.PUBLIC) protected final EventBus eventBus = new EventBus(GobblinClusterManager.class.getSimpleName()); protected final Path appWorkDir; @@ -400,12 +402,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri stop(); } - @VisibleForTesting - EventBus getEventBus() { - return this.eventBus; - } - - /** * Creates and returns a {@link MessageHandlerFactory} for handling of Helix * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s. diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index b9f1c96..2d35dea 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -42,12 +44,15 @@ import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; +import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.messaging.handling.HelixTaskResult; import org.apache.helix.messaging.handling.MessageHandler; import org.apache.helix.messaging.handling.MessageHandlerFactory; @@ -57,6 +62,10 @@ import org.apache.helix.task.TaskStateModelFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -121,6 +130,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private final String clusterName; + @Getter private HelixManager jobHelixManager; private Optional<HelixManager> taskDriverHelixManager = Optional.absent(); @@ -292,7 +302,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { // Add a shutdown hook so the task scheduler gets properly shutdown addShutdownHook(); - connectHelixManager(); + connectHelixManagerWithRetry(); addInstanceTags(); @@ -366,21 +376,74 @@ public class GobblinTaskRunner implements StandardMetricsBridge { } @VisibleForTesting - void connectHelixManager() { - try { - this.jobHelixManager.connect(); - this.jobHelixManager.getMessagingService() - .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, - new ParticipantShutdownMessageHandlerFactory()); - this.jobHelixManager.getMessagingService() - .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), - getUserDefinedMessageHandlerFactory()); - if (this.taskDriverHelixManager.isPresent()) { - this.taskDriverHelixManager.get().connect(); + void connectHelixManager() throws Exception { + this.jobHelixManager.connect(); + //Ensure the instance is enabled. + this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true); + this.jobHelixManager.getMessagingService() + .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, + new ParticipantShutdownMessageHandlerFactory()); + this.jobHelixManager.getMessagingService() + .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), + getUserDefinedMessageHandlerFactory()); + if (this.taskDriverHelixManager.isPresent()) { + this.taskDriverHelixManager.get().connect(); + //Ensure the instance is enabled. + this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true); + } + } + + /** + * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting + * to re-join the cluster: + * <li> + * <ul>Disconnect from Helix cluster, which would close any open clients</ul> + * <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul> + * <ul>Re-construct helix manager instances, used to re-join the cluster</ul> + * </li> + */ + private void onClusterJoinFailure() { + logger.warn("Disconnecting Helix manager.."); + disconnectHelixManager(); + + HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY)); + //Drop the helix Instance + logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName); + HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName); + + if (this.taskDriverHelixManager.isPresent()) { + String taskDriverCluster = clusterConfig.getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY); + logger.warn("Dropping instance: {} from task driver cluster: {}", helixInstanceName, taskDriverCluster); + HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName); + } + admin.close(); + + logger.warn("Reinitializing Helix manager.."); + initHelixManager(); + } + + @VisibleForTesting + void connectHelixManagerWithRetry() { + Callable<Void> connectHelixManagerCallable = () -> { + try { + logger.info("Instance: {} attempting to join cluster: {}", helixInstanceName, clusterName); + connectHelixManager(); + } catch (HelixException e) { + logger.error("Exception encountered when joining cluster", e); + onClusterJoinFailure(); + throw e; } - } catch (Exception e) { - logger.error("HelixManager failed to connect", e); - throw Throwables.propagate(e); + return null; + }; + + Retryer<Void> retryer = RetryerBuilder.<Void>newBuilder() + .retryIfException() + .withStopStrategy(StopStrategies.stopAfterAttempt(5)).build(); + + try { + retryer.call(connectHelixManagerCallable); + } catch (ExecutionException | RetryException e) { + Throwables.propagate(e); } } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index f6f4730..f70a741 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -20,6 +20,7 @@ package org.apache.gobblin.cluster; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -27,9 +28,14 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.task.JobConfig; import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskConfig; @@ -331,4 +337,30 @@ public class HelixUtils { System.setProperty(entry.getKey().toString(), entry.getValue().toString()); } } + + /** + * A utility method that returns all current live instances in a given Helix cluster. This method assumes that + * the passed {@link HelixManager} instance is already connected. + * @param helixManager + * @return all live instances in the Helix cluster. + */ + public static List<String> getLiveInstances(HelixManager helixManager) { + HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); + PropertyKey liveInstancesKey = accessor.keyBuilder().liveInstances(); + return accessor.getChildNames(liveInstancesKey); + } + + public static boolean isInstanceLive(HelixManager helixManager, String instanceName) { + HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); + PropertyKey liveInstanceKey = accessor.keyBuilder().liveInstance(instanceName); + return accessor.getProperty(liveInstanceKey) != null; + } + + public static void dropInstanceIfExists(HelixAdmin admin, String clusterName, String helixInstanceName) { + try { + admin.dropInstance(clusterName, new InstanceConfig(helixInstanceName)); + } catch (HelixException e) { + log.error("Could not drop instance: {} due to: {}", helixInstanceName, e); + } + } } \ No newline at end of file diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java index b115607..0e48661 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java @@ -23,6 +23,9 @@ import java.net.URL; import org.apache.curator.test.TestingServer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.manager.zk.ZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -60,6 +63,9 @@ public class GobblinTaskRunnerTest { private GobblinTaskRunner gobblinTaskRunner; private GobblinClusterManager gobblinClusterManager; + private GobblinTaskRunner corruptGobblinTaskRunner; + private String clusterName; + private String corruptHelixInstance; @BeforeClass public void setUp() throws Exception { @@ -80,8 +86,8 @@ public class GobblinTaskRunnerTest { .resolve(); String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); - HelixUtils.createGobblinHelixCluster(zkConnectionString, - config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY)); + this.clusterName = config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); + HelixUtils.createGobblinHelixCluster(zkConnectionString, this.clusterName); // Participant this.gobblinTaskRunner = @@ -89,12 +95,17 @@ public class GobblinTaskRunnerTest { TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent()); this.gobblinTaskRunner.connectHelixManager(); + // Participant with a partial Instance set up on Helix/ZK + this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0); + this.corruptGobblinTaskRunner = + new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, corruptHelixInstance, + TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent()); + // Controller this.gobblinClusterManager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_APPLICATION_ID, config, Optional.<Path>absent()); this.gobblinClusterManager.connectHelixManager(); - } @Test @@ -117,6 +128,36 @@ public class GobblinTaskRunnerTest { Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value"); } + @Test + public void testConnectHelixManagerWithRetry() { + //Connect and disconnect the corrupt task runner to create a Helix Instance set up. + try { + this.corruptGobblinTaskRunner.connectHelixManager(); + this.corruptGobblinTaskRunner.disconnectHelixManager(); + } catch (Exception e) { + Assert.fail("Failed to connect to ZK"); + } + + //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up. + ZkClient zkClient = new ZkClient(testingZKServer.getConnectString()); + zkClient.delete(PropertyPathBuilder.instanceError(clusterName, corruptHelixInstance)); + zkClient.delete(PropertyPathBuilder.instanceHistory(clusterName, corruptHelixInstance)); + zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(clusterName, corruptHelixInstance)); + + //Ensure that the connecting to Helix without retry will throw a HelixException + try { + corruptGobblinTaskRunner.connectHelixManager(); + Assert.fail("Unexpected success in connecting to HelixManager"); + } catch (Exception e) { + //Assert that a HelixException is thrown. + Assert.assertTrue(e.getClass().equals(HelixException.class)); + } + + //Ensure that connect with retry succeeds + corruptGobblinTaskRunner.connectHelixManagerWithRetry(); + Assert.assertTrue(true); + } + @AfterClass public void tearDown() throws IOException { try { diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java index 14488d5..b1095bc 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java @@ -119,7 +119,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager { protected YarnService buildYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs) throws Exception { - return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus); + return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus, + this.getMultiManager().getJobClusterHelixManager()); } /** diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index a6d7a88..82842f2 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; import org.apache.helix.Criteria; +import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -319,7 +320,12 @@ public class GobblinYarnAppLauncher { this.securityManager.get().loginAndScheduleTokenRenewal(); } - this.applicationId = getApplicationId(); + Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId(); + if (!reconnectableApplicationId.isPresent()) { + disableLiveHelixInstances(); + LOGGER.info("No reconnectable application found so submitting a new application"); + this.applicationId = Optional.of(setupAndSubmitApplication()); + } this.applicationStatusMonitor.scheduleAtFixedRate(new Runnable() { @Override @@ -396,6 +402,9 @@ public class GobblinYarnAppLauncher { stopYarnClient(); + LOGGER.info("Disabling all live Helix instances.."); + disableLiveHelixInstances(); + disconnectHelixManager(); } finally { try { @@ -479,6 +488,26 @@ public class GobblinYarnAppLauncher { } } + /** + * A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application + * leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use + * the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the + * new cluster. + * + * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application + * completion, this method should be removed. + */ + void disableLiveHelixInstances() { + String clusterName = this.helixManager.getClusterName(); + HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool(); + List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager); + LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size()); + for (String instanceName: liveInstances) { + LOGGER.warn("Disabling instance: {}", instanceName); + helixAdmin.enableInstance(clusterName, instanceName, false); + } + } + @VisibleForTesting void disconnectHelixManager() { if (this.helixManager.isConnected()) { @@ -496,17 +525,6 @@ public class GobblinYarnAppLauncher { this.yarnClient.stop(); } - private Optional<ApplicationId> getApplicationId() throws YarnException, IOException { - Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId(); - if (reconnectableApplicationId.isPresent()) { - LOGGER.info("Found reconnectable application with application ID: " + reconnectableApplicationId.get()); - return reconnectableApplicationId; - } - - LOGGER.info("No reconnectable application found so submitting a new application"); - return Optional.of(setupAndSubmitApplication()); - } - @VisibleForTesting Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException { List<ApplicationReport> applicationReports = @@ -518,6 +536,7 @@ public class GobblinYarnAppLauncher { // Try to find an application with a matching application name for (ApplicationReport applicationReport : applicationReports) { if (this.applicationName.equals(applicationReport.getName())) { + LOGGER.info("Found reconnectable application with application ID: " + applicationReport.getApplicationId()); return Optional.of(applicationReport.getApplicationId()); } } diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java index 4586571..5f6da10 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.Records; +import org.apache.helix.HelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +83,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import com.google.common.collect.Sets; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import com.google.common.io.Closer; @@ -123,6 +125,8 @@ public class YarnService extends AbstractIdleService { private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults(); + private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN"; + private final String applicationName; private final String applicationId; private final String appViewAcl; @@ -156,6 +160,7 @@ public class YarnService extends AbstractIdleService { private final Optional<String> containerJvmArgs; private final String containerTimezone; + private final HelixManager helixManager; private volatile Optional<Resource> maxResourceCapacity = Optional.absent(); @@ -199,9 +204,10 @@ public class YarnService extends AbstractIdleService { @VisibleForTesting @Getter(AccessLevel.PROTECTED) private int numRequestedContainers = 0; + private final Set<String> blacklistedInstances = Sets.newHashSet(); public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, - FileSystem fs, EventBus eventBus) throws Exception { + FileSystem fs, EventBus eventBus, HelixManager helixManager) throws Exception { this.applicationName = applicationName; this.applicationId = applicationId; @@ -209,6 +215,8 @@ public class YarnService extends AbstractIdleService { this.eventBus = eventBus; + this.helixManager = helixManager; + this.gobblinMetrics = config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ? Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent(); @@ -625,7 +633,10 @@ public class YarnService extends AbstractIdleService { */ protected void handleContainerCompletion(ContainerStatus containerStatus) { Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId()); - String completedInstanceName = completedContainerEntry == null? "unknown" : completedContainerEntry.getValue(); + //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might + //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the + //containerId missing from the containersMap. + String completedInstanceName = completedContainerEntry == null? UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue(); LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d", containerStatus.getContainerId(), completedInstanceName, containerStatus.getExitStatus())); @@ -635,10 +646,26 @@ public class YarnService extends AbstractIdleService { containerStatus.getContainerId(), containerStatus.getDiagnostics())); } - if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) { - LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", - containerStatus.getContainerId()); - return; + if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) { + if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) { + LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId()); + return; + } else { + LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId()); + // Container release was not requested. Likely, the container was running on a node on which the NM died. + // In this case, RM assumes that the containers are "lost", even though the container process may still be + // running on the node. We need to ensure that the Helix instances running on the orphaned containers + // are fenced off from the Helix cluster to avoid double publishing and state being committed by the + // instances. + if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) { + String clusterName = this.helixManager.getClusterName(); + //Disable the orphaned instance. + if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) { + LOGGER.info("Disabling the Helix instance {}", completedInstanceName); + this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false); + } + } + } } if (this.shutdownInProgress) { @@ -723,10 +750,10 @@ public class YarnService extends AbstractIdleService { LOGGER.info(String.format("Container %s has been allocated", container.getId())); String instanceName = unusedHelixInstanceNames.poll(); - if (Strings.isNullOrEmpty(instanceName)) { + while (Strings.isNullOrEmpty(instanceName) || HelixUtils.isInstanceLive(helixManager, instanceName)) { // No unused instance name, so generating a new one. - instanceName = HelixUtils.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, - helixInstanceIdGenerator.incrementAndGet()); + instanceName = HelixUtils + .getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, helixInstanceIdGenerator.incrementAndGet()); } final String finalInstanceName = instanceName; diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index adc8bd0..c1c7142 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -480,7 +480,7 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase { private static class TestYarnService extends YarnService { public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { - super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null); } @Override diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java index 2eb032f..160ad39 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java @@ -17,13 +17,6 @@ package org.apache.gobblin.yarn; -import com.google.common.base.Predicate; -import com.google.common.eventbus.EventBus; -import com.google.common.io.Closer; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; - import java.io.IOException; import java.lang.reflect.Field; import java.net.URL; @@ -35,7 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; -import org.apache.gobblin.testing.AssertWithBackoff; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.security.UserGroupInformation; @@ -64,6 +57,15 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.base.Predicate; +import com.google.common.eventbus.EventBus; +import com.google.common.io.Closer; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.testing.AssertWithBackoff; + /** * Tests for {@link YarnService}. @@ -282,7 +284,7 @@ public class YarnServiceTest { static class TestYarnService extends YarnService { public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { - super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus, null); } protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)