Repository: hadoop Updated Branches: refs/heads/branch-3.1 293c992e8 -> 7ef4ff190
YARN-9071. Improved status update for reinitialized containers. Contributed by Chandni Singh (cherry picked from commit 1b790f4dd1f682423d5dbb8e70c6225cbddce989) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ef4ff19 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ef4ff19 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ef4ff19 Branch: refs/heads/branch-3.1 Commit: 7ef4ff19057bbb449385358ce0bc2c635ab2b5bb Parents: 293c992 Author: Eric Yang <ey...@apache.org> Authored: Wed Dec 5 17:00:56 2018 -0500 Committer: Eric Yang <ey...@apache.org> Committed: Wed Dec 5 19:05:26 2018 -0500 ---------------------------------------------------------------------- .../component/instance/ComponentInstance.java | 76 +++++++++++++------- .../instance/TestComponentInstance.java | 36 ++++++++++ .../container/ContainerImpl.java | 23 ++++-- .../monitor/ContainerMetrics.java | 8 ++- .../monitor/ContainerStopMonitoringEvent.java | 12 ++++ .../monitor/ContainersMonitorImpl.java | 6 +- .../monitor/TestContainerMetrics.java | 6 +- 7 files changed, 131 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ef4ff19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 86b0e32..0145847 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -154,10 +154,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, REINITIALIZED), START, new StartedAfterUpgradeTransition()) .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT), STOP, new StoppedAfterCancelUpgradeTransition()) + + // FROM REINITIALIZED .addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE, new CancelledAfterReinitTransition()) .addTransition(REINITIALIZED, READY, BECOME_READY, new ContainerBecomeReadyTransition(true)) + .addTransition(REINITIALIZED, REINITIALIZED, STOP, + new StoppedAfterUpgradeTransition()) .installTopology(); public ComponentInstance(Component component, @@ -184,20 +188,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { // Query container status for ip and host - boolean cancelOnSuccess = true; - if (compInstance.getCompSpec().getArtifact() != null && compInstance - .getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) { - // A docker container might get a different IP if the container is - // relaunched by the NM, so we need to keep checking the status. - // This is a temporary fix until the NM provides a callback for - // container relaunch (see YARN-8265). - cancelOnSuccess = false; - } - compInstance.containerStatusFuture = - compInstance.scheduler.executorService.scheduleAtFixedRate( - new ContainerStatusRetriever(compInstance.scheduler, - event.getContainerId(), compInstance, cancelOnSuccess), 0, 1, - TimeUnit.SECONDS); + compInstance.initializeStatusRetriever(event); long containerStartTime = System.currentTimeMillis(); try { ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils @@ -277,6 +268,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, instance.upgradeInProgress.set(false); instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY); + instance.initializeStatusRetriever(event); Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ? instance.component.getUpgradeStatus() : @@ -572,13 +564,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, instance.setContainerState(ContainerState.UPGRADING); instance.component.decContainersReady(false); - Component.UpgradeStatus status = instance.component.getUpgradeStatus(); - instance.scheduler.getContainerLaunchService() - .reInitCompInstance(instance.scheduler.getApp(), instance, - instance.container, - instance.component.createLaunchContext( - status.getTargetSpec(), - status.getTargetVersion())); + Component.UpgradeStatus upgradeStatus = instance.component. + getUpgradeStatus(); + instance.reInitHelper(upgradeStatus); } } @@ -634,11 +622,35 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, LOG.info("{} cancelling upgrade", container.getId()); setContainerState(ContainerState.UPGRADING); Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus(); + reInitHelper(cancelStatus); + } + + private void reInitHelper(Component.UpgradeStatus upgradeStatus) { + cancelContainerStatusRetriever(); + setContainerStatus(null); + scheduler.executorService.submit(() -> cleanupRegistry(container.getId())); scheduler.getContainerLaunchService() .reInitCompInstance(scheduler.getApp(), this, this.container, this.component.createLaunchContext( - cancelStatus.getTargetSpec(), - cancelStatus.getTargetVersion())); + upgradeStatus.getTargetSpec(), + upgradeStatus.getTargetVersion())); + } + + private void initializeStatusRetriever(ComponentInstanceEvent event) { + boolean cancelOnSuccess = true; + if (getCompSpec().getArtifact() != null && + getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) { + // A docker container might get a different IP if the container is + // relaunched by the NM, so we need to keep checking the status. + // This is a temporary fix until the NM provides a callback for + // container relaunch (see YARN-8265). + cancelOnSuccess = false; + } + containerStatusFuture = + scheduler.executorService.scheduleAtFixedRate( + new ContainerStatusRetriever(scheduler, event.getContainerId(), + this, cancelOnSuccess), 0, 1, + TimeUnit.SECONDS); } public ComponentInstanceState getState() { @@ -725,11 +737,25 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } public ContainerStatus getContainerStatus() { - return status; + try { + readLock.lock(); + return status; + } finally { + readLock.unlock(); + } + } + + private void setContainerStatus(ContainerStatus latestStatus) { + try { + writeLock.lock(); + this.status = latestStatus; + } finally { + writeLock.unlock(); + } } public void updateContainerStatus(ContainerStatus status) { - this.status = status; + setContainerStatus(status); org.apache.hadoop.yarn.service.api.records.Container container = getCompSpec().getContainer(status.getContainerId().toString()); boolean doRegistryUpdate = true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ef4ff19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index 09652d7..f857353 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -141,6 +141,42 @@ public class TestComponentInstance { } @Test + public void testFailureAfterReinit() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testContainerUpgradeFailed"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(upgradeEvent); + + // NM finished updgrae + instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), + ComponentInstanceEventType.START)); + Assert.assertEquals("instance not running", + ContainerState.RUNNING_BUT_UNREADY, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + when(containerStatus.getExitStatus()).thenReturn( + ContainerExitStatus.ABORTED); + ComponentInstanceEvent stopEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.STOP) + .setStatus(containerStatus); + // this is the call back from NM for the upgrade + instance.handle(stopEvent); + Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } + + @Test public void testCancelNothingToUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testCancelUpgradeWhenContainerReady"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ef4ff19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index d52b133..be7bd72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -929,10 +929,21 @@ public class ContainerImpl implements Container { this.workDir = workDir; } + private void clearIpAndHost() { + LOG.info("{} clearing ip and host", containerId); + this.ips = null; + this.host = null; + } + @Override public void setIpAndHost(String[] ipAndHost) { - this.ips = ipAndHost[0]; - this.host = ipAndHost[1]; + try { + this.writeLock.lock(); + this.ips = ipAndHost[0]; + this.host = ipAndHost[1]; + } finally { + this.writeLock.unlock(); + } } @Override @@ -1722,7 +1733,11 @@ public class ContainerImpl implements Container { + "] for re-initialization !!"); container.wasLaunched = false; container.metrics.endRunningContainer(); - + container.clearIpAndHost(); + // Remove the container from the resource-monitor. When container + // is launched again, it is added back to monitoring service. + container.dispatcher.getEventHandler().handle( + new ContainerStopMonitoringEvent(container.containerId, true)); container.launchContext = container.reInitContext.newLaunchContext; // Re configure the Retry Context @@ -1886,7 +1901,7 @@ public class ContainerImpl implements Container { if (container.containerMetrics != null) { container.containerMetrics .recordFinishTimeAndExitCode(clock.getTime(), container.exitCode); - container.containerMetrics.finished(); + container.containerMetrics.finished(false); } container.sendFinishedEvents(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ef4ff19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java index a6aa337..e4a1550 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -242,14 +242,18 @@ public class ContainerMetrics implements MetricsSource { } } - public synchronized void finished() { + public synchronized void finished(boolean unregisterWithoutDelay) { if (!finished) { this.finished = true; if (timer != null) { timer.cancel(); timer = null; } - scheduleTimerTaskForUnregistration(); + if (!unregisterWithoutDelay) { + scheduleTimerTaskForUnregistration(); + } else { + ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this); + } this.pMemMBQuantiles.stop(); this.cpuCoreUsagePercentQuantiles.stop(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ef4ff19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java index 240c5c0..810271e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java @@ -22,8 +22,20 @@ import org.apache.hadoop.yarn.api.records.ContainerId; public class ContainerStopMonitoringEvent extends ContainersMonitorEvent { + private final boolean forReInit; + public ContainerStopMonitoringEvent(ContainerId containerId) { super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER); + forReInit = false; } + public ContainerStopMonitoringEvent(ContainerId containerId, + boolean forReInit) { + super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER); + this.forReInit = forReInit; + } + + public boolean isForReInit() { + return forReInit; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ef4ff19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 33986a0..1007750 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -487,7 +487,7 @@ public class ContainersMonitorImpl extends AbstractService implements } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainersMonitorImpl " - + "while monitoring resource of " + containerId, e); + + "while monitoring resource of {}", containerId, e); } } if (LOG.isDebugEnabled()) { @@ -805,10 +805,12 @@ public class ContainersMonitorImpl extends AbstractService implements vmemLimitMBs, pmemLimitMBs, cpuVcores); break; case STOP_MONITORING_CONTAINER: + ContainerStopMonitoringEvent stopEvent = + (ContainerStopMonitoringEvent) monitoringEvent; usageMetrics = ContainerMetrics.getContainerMetrics( containerId); if (usageMetrics != null) { - usageMetrics.finished(); + usageMetrics.finished(stopEvent.isForReInit()); } break; case CHANGE_MONITORING_CONTAINER_RESOURCE: http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ef4ff19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java index 1840d62..9caea70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -63,7 +63,7 @@ public class TestContainerMetrics { assertEquals(ERR, 1, collector.getRecords().size()); collector.clear(); - metrics.finished(); + metrics.finished(false); metrics.getMetrics(collector, true); assertEquals(ERR, 1, collector.getRecords().size()); collector.clear(); @@ -137,8 +137,8 @@ public class TestContainerMetrics { ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3); ContainerMetrics metrics3 = ContainerMetrics.forContainer(system, containerId3, 1, 0); - metrics1.finished(); - metrics2.finished(); + metrics1.finished(false); + metrics2.finished(false); system.sampleMetrics(); system.sampleMetrics(); Thread.sleep(100); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org