Repository: hadoop
Updated Branches:
refs/heads/trunk 1dabb31cd -> 1b790f4dd
YARN-9071. Improved status update for reinitialized containers.
Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b790f4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b790f4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b790f4d
Branch: refs/heads/trunk
Commit: 1b790f4dd1f682423d5dbb8e70c6225cbddce989
Parents: 1dabb31
Author: Eric Yang <[email protected]>
Authored: Wed Dec 5 17:00:56 2018 -0500
Committer: Eric Yang <[email protected]>
Committed: Wed Dec 5 17:00:56 2018 -0500
----------------------------------------------------------------------
.../component/instance/ComponentInstance.java | 76 +++++++++++++-------
.../instance/TestComponentInstance.java | 36 ++++++++++
.../container/ContainerImpl.java | 23 ++++--
.../monitor/ContainerMetrics.java | 8 ++-
.../monitor/ContainerStopMonitoringEvent.java | 12 ++++
.../monitor/ContainersMonitorImpl.java | 6 +-
.../monitor/TestContainerMetrics.java | 6 +-
7 files changed, 131 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b790f4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index 25aba77..ef844a5 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -152,10 +152,14 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
REINITIALIZED), START, new StartedAfterUpgradeTransition())
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
STOP, new StoppedAfterCancelUpgradeTransition())
+
+ // FROM REINITIALIZED
.addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE,
new CancelledAfterReinitTransition())
.addTransition(REINITIALIZED, READY, BECOME_READY,
new ContainerBecomeReadyTransition(true))
+ .addTransition(REINITIALIZED, REINITIALIZED, STOP,
+ new StoppedAfterUpgradeTransition())
.installTopology();
public ComponentInstance(Component component,
@@ -182,20 +186,7 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
@Override public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
// Query container status for ip and host
- boolean cancelOnSuccess = true;
- if (compInstance.getCompSpec().getArtifact() != null && compInstance
- .getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
- // A docker container might get a different IP if the container is
- // relaunched by the NM, so we need to keep checking the status.
- // This is a temporary fix until the NM provides a callback for
- // container relaunch (see YARN-8265).
- cancelOnSuccess = false;
- }
- compInstance.containerStatusFuture =
- compInstance.scheduler.executorService.scheduleAtFixedRate(
- new ContainerStatusRetriever(compInstance.scheduler,
- event.getContainerId(), compInstance, cancelOnSuccess), 0, 1,
- TimeUnit.SECONDS);
+ compInstance.initializeStatusRetriever(event);
long containerStartTime = System.currentTimeMillis();
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
@@ -275,6 +266,7 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
instance.upgradeInProgress.set(false);
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
+ instance.initializeStatusRetriever(event);
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
instance.component.getUpgradeStatus() :
@@ -570,13 +562,9 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
instance.setContainerState(ContainerState.UPGRADING);
instance.component.decContainersReady(false);
- Component.UpgradeStatus status = instance.component.getUpgradeStatus();
- instance.scheduler.getContainerLaunchService()
- .reInitCompInstance(instance.scheduler.getApp(), instance,
- instance.container,
- instance.component.createLaunchContext(
- status.getTargetSpec(),
- status.getTargetVersion()));
+ Component.UpgradeStatus upgradeStatus = instance.component.
+ getUpgradeStatus();
+ instance.reInitHelper(upgradeStatus);
}
}
@@ -632,11 +620,35 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
LOG.info("{} cancelling upgrade", container.getId());
setContainerState(ContainerState.UPGRADING);
Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
+ reInitHelper(cancelStatus);
+ }
+
+ private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
+ cancelContainerStatusRetriever();
+ setContainerStatus(null);
+ scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
scheduler.getContainerLaunchService()
.reInitCompInstance(scheduler.getApp(), this,
this.container, this.component.createLaunchContext(
- cancelStatus.getTargetSpec(),
- cancelStatus.getTargetVersion()));
+ upgradeStatus.getTargetSpec(),
+ upgradeStatus.getTargetVersion()));
+ }
+
+ private void initializeStatusRetriever(ComponentInstanceEvent event) {
+ boolean cancelOnSuccess = true;
+ if (getCompSpec().getArtifact() != null &&
+ getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
+ // A docker container might get a different IP if the container is
+ // relaunched by the NM, so we need to keep checking the status.
+ // This is a temporary fix until the NM provides a callback for
+ // container relaunch (see YARN-8265).
+ cancelOnSuccess = false;
+ }
+ containerStatusFuture =
+ scheduler.executorService.scheduleAtFixedRate(
+ new ContainerStatusRetriever(scheduler, event.getContainerId(),
+ this, cancelOnSuccess), 0, 1,
+ TimeUnit.SECONDS);
}
public ComponentInstanceState getState() {
@@ -723,11 +735,25 @@ public class ComponentInstance implements
EventHandler<ComponentInstanceEvent>,
}
public ContainerStatus getContainerStatus() {
- return status;
+ try {
+ readLock.lock();
+ return status;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void setContainerStatus(ContainerStatus latestStatus) {
+ try {
+ writeLock.lock();
+ this.status = latestStatus;
+ } finally {
+ writeLock.unlock();
+ }
}
public void updateContainerStatus(ContainerStatus status) {
- this.status = status;
+ setContainerStatus(status);
org.apache.hadoop.yarn.service.api.records.Container container =
getCompSpec().getContainer(status.getContainerId().toString());
boolean doRegistryUpdate = true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b790f4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
index 09652d7..f857353 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
@@ -141,6 +141,42 @@ public class TestComponentInstance {
}
@Test
+ public void testFailureAfterReinit() throws Exception {
+ ServiceContext context = TestComponent.createTestContext(rule,
+ "testContainerUpgradeFailed");
+ Component component = context.scheduler.getAllComponents().entrySet()
+ .iterator().next().getValue();
+ upgradeComponent(component);
+
+ ComponentInstance instance =
component.getAllComponentInstances().iterator()
+ .next();
+
+ ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+ instance.handle(upgradeEvent);
+
+ // NM finished updgrae
+ instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
+ ComponentInstanceEventType.START));
+ Assert.assertEquals("instance not running",
+ ContainerState.RUNNING_BUT_UNREADY,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+
+ ContainerStatus containerStatus = mock(ContainerStatus.class);
+ when(containerStatus.getExitStatus()).thenReturn(
+ ContainerExitStatus.ABORTED);
+ ComponentInstanceEvent stopEvent = new ComponentInstanceEvent(
+ instance.getContainer().getId(), ComponentInstanceEventType.STOP)
+ .setStatus(containerStatus);
+ // this is the call back from NM for the upgrade
+ instance.handle(stopEvent);
+ Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE,
+ component.getComponentSpec().getContainer(instance.getContainer()
+ .getId().toString()).getState());
+ }
+
+ @Test
public void testCancelNothingToUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testCancelUpgradeWhenContainerReady");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b790f4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 6716dbb..633052e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -934,10 +934,21 @@ public class ContainerImpl implements Container {
this.workDir = workDir;
}
+ private void clearIpAndHost() {
+ LOG.info("{} clearing ip and host", containerId);
+ this.ips = null;
+ this.host = null;
+ }
+
@Override
public void setIpAndHost(String[] ipAndHost) {
- this.ips = ipAndHost[0];
- this.host = ipAndHost[1];
+ try {
+ this.writeLock.lock();
+ this.ips = ipAndHost[0];
+ this.host = ipAndHost[1];
+ } finally {
+ this.writeLock.unlock();
+ }
}
@Override
@@ -1729,7 +1740,11 @@ public class ContainerImpl implements Container {
+ "] for re-initialization !!");
container.wasLaunched = false;
container.metrics.endRunningContainer();
-
+ container.clearIpAndHost();
+ // Remove the container from the resource-monitor. When container
+ // is launched again, it is added back to monitoring service.
+ container.dispatcher.getEventHandler().handle(
+ new ContainerStopMonitoringEvent(container.containerId, true));
container.launchContext = container.reInitContext.newLaunchContext;
// Re configure the Retry Context
@@ -1894,7 +1909,7 @@ public class ContainerImpl implements Container {
if (container.containerMetrics != null) {
container.containerMetrics
.recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
- container.containerMetrics.finished();
+ container.containerMetrics.finished(false);
}
container.sendFinishedEvents();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b790f4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
index 2a95849..bca7c3f 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
@@ -242,14 +242,18 @@ public class ContainerMetrics implements MetricsSource {
}
}
- public synchronized void finished() {
+ public synchronized void finished(boolean unregisterWithoutDelay) {
if (!finished) {
this.finished = true;
if (timer != null) {
timer.cancel();
timer = null;
}
- scheduleTimerTaskForUnregistration();
+ if (!unregisterWithoutDelay) {
+ scheduleTimerTaskForUnregistration();
+ } else {
+ ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
+ }
this.pMemMBQuantiles.stop();
this.cpuCoreUsagePercentQuantiles.stop();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b790f4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
index 240c5c0..810271e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStopMonitoringEvent.java
@@ -22,8 +22,20 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
public class ContainerStopMonitoringEvent extends ContainersMonitorEvent {
+ private final boolean forReInit;
+
public ContainerStopMonitoringEvent(ContainerId containerId) {
super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
+ forReInit = false;
}
+ public ContainerStopMonitoringEvent(ContainerId containerId,
+ boolean forReInit) {
+ super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
+ this.forReInit = forReInit;
+ }
+
+ public boolean isForReInit() {
+ return forReInit;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b790f4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index b9e2c68..b7fca86 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -533,7 +533,7 @@ public class ContainersMonitorImpl extends AbstractService
implements
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainersMonitorImpl "
- + "while monitoring resource of " + containerId, e);
+ + "while monitoring resource of {}", containerId, e);
}
}
if (LOG.isDebugEnabled()) {
@@ -861,10 +861,12 @@ public class ContainersMonitorImpl extends
AbstractService implements
vmemLimitMBs, pmemLimitMBs, cpuVcores);
break;
case STOP_MONITORING_CONTAINER:
+ ContainerStopMonitoringEvent stopEvent =
+ (ContainerStopMonitoringEvent) monitoringEvent;
usageMetrics = ContainerMetrics.getContainerMetrics(
containerId);
if (usageMetrics != null) {
- usageMetrics.finished();
+ usageMetrics.finished(stopEvent.isForReInit());
}
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b790f4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
index 8b2bff1..ce1baf9 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
@@ -63,7 +63,7 @@ public class TestContainerMetrics {
assertEquals(ERR, 1, collector.getRecords().size());
collector.clear();
- metrics.finished();
+ metrics.finished(false);
metrics.getMetrics(collector, true);
assertEquals(ERR, 1, collector.getRecords().size());
collector.clear();
@@ -137,8 +137,8 @@ public class TestContainerMetrics {
ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
containerId3, 1, 0);
- metrics1.finished();
- metrics2.finished();
+ metrics1.finished(false);
+ metrics2.finished(false);
system.sampleMetrics();
system.sampleMetrics();
Thread.sleep(100);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]