YARN-3464. Race condition in LocalizerRunner kills localizer before localizing all resources. (Zhihai Xu via kasha)
(cherry picked from commit 47279c3228185548ed09c36579b420225e4894f5) (cherry picked from commit 4045c41afe440b773d006e962bf8a5eae3fdc284) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ddcc7e5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ddcc7e5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ddcc7e5 Branch: refs/heads/branch-2.7 Commit: 4ddcc7e5b5b6d7d01d6dc3c79fa330d6a44e59ba Parents: bec78f9 Author: Karthik Kambatla <ka...@apache.org> Authored: Sun Apr 26 09:13:46 2015 -0700 Committer: Karthik Kambatla <ka...@apache.org> Committed: Mon Apr 27 13:37:06 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../container/ContainerImpl.java | 8 ++- .../localizer/ResourceLocalizationService.java | 53 +++++++++++++++----- .../localizer/event/LocalizationEventType.java | 1 + .../TestResourceLocalizationService.java | 12 ++++- 5 files changed, 61 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5760f34..4f55003 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -41,6 +41,8 @@ Release 2.7.1 - UNRELEASED YARN-3516. killing ContainerLocalizer action doesn't take effect when private localizer receives FETCH_FAILURE status.(zhihai xu via xgong) + YARN-3464. Race condition in LocalizerRunner kills localizer before + localizing all resources. (Zhihai Xu via kasha) Release 2.7.0 - 2015-04-20 http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 131d439..f55e0e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; @@ -715,7 +717,12 @@ public class ContainerImpl implements Container { return ContainerState.LOCALIZING; } + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationEvent(LocalizationEventType. + CONTAINER_RESOURCES_LOCALIZED, container)); + container.sendLaunchEvent(); + container.metrics.endInitingContainer(); // If this is a recovered container that has already launched, skip // uploading resources to the shared cache. We do this to avoid uploading @@ -733,7 +740,6 @@ public class ContainerImpl implements Container { SharedCacheUploadEventType.UPLOAD)); } - container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index bb05946..4fb0ed7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -108,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -389,6 +391,9 @@ public class ResourceLocalizationService extends CompositeService case INIT_CONTAINER_RESOURCES: handleInitContainerResources((ContainerLocalizationRequestEvent) event); break; + case CONTAINER_RESOURCES_LOCALIZED: + handleContainerResourcesLocalized((ContainerLocalizationEvent) event); + break; case CACHE_CLEANUP: handleCacheCleanup(event); break; @@ -451,7 +456,18 @@ public class ResourceLocalizationService extends CompositeService } } } - + + /** + * Once a container's resources are localized, kill the corresponding + * {@link ContainerLocalizer} + */ + private void handleContainerResourcesLocalized( + ContainerLocalizationEvent event) { + Container c = event.getContainer(); + String locId = ConverterUtils.toString(c.getContainerId()); + localizerTracker.endContainerLocalization(locId); + } + private void handleCacheCleanup(LocalizationEvent event) { ResourceRetentionSet retain = new ResourceRetentionSet(delService, cacheTargetSize); @@ -662,7 +678,7 @@ public class ResourceLocalizationService extends CompositeService response.setLocalizerAction(LocalizerAction.DIE); return response; } - return localizer.update(status.getResources()); + return localizer.processHeartbeat(status.getResources()); } } @@ -716,6 +732,17 @@ public class ResourceLocalizationService extends CompositeService localizer.interrupt(); } } + + public void endContainerLocalization(String locId) { + LocalizerRunner localizer; + synchronized (privLocalizers) { + localizer = privLocalizers.get(locId); + if (null == localizer) { + return; // ignore + } + } + localizer.endContainerLocalization(); + } } @@ -870,6 +897,7 @@ public class ResourceLocalizationService extends CompositeService final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled; // Its a shared list between Private Localizer and dispatcher thread. final List<LocalizerResourceRequestEvent> pending; + private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false); // TODO: threadsafe, use outer? private final RecordFactory recordFactory = @@ -890,6 +918,10 @@ public class ResourceLocalizationService extends CompositeService pending.add(request); } + public void endContainerLocalization() { + killContainerLocalizer.set(true); + } + /** * Find next resource to be given to a spawned localizer. * @@ -936,7 +968,7 @@ public class ResourceLocalizationService extends CompositeService } } - LocalizerHeartbeatResponse update( + LocalizerHeartbeatResponse processHeartbeat( List<LocalResourceStatus> remoteResourceStatuses) { LocalizerHeartbeatResponse response = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); @@ -945,7 +977,7 @@ public class ResourceLocalizationService extends CompositeService ApplicationId applicationId = context.getContainerId().getApplicationAttemptId().getApplicationId(); - LocalizerAction action = LocalizerAction.LIVE; + boolean fetchFailed = false; // Update resource statuses. for (LocalResourceStatus stat : remoteResourceStatuses) { LocalResource rsrc = stat.getResource(); @@ -981,7 +1013,7 @@ public class ResourceLocalizationService extends CompositeService case FETCH_FAILURE: final String diagnostics = stat.getException().toString(); LOG.warn(req + " failed: " + diagnostics); - action = LocalizerAction.DIE; + fetchFailed = true; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, diagnostics)); @@ -993,15 +1025,15 @@ public class ResourceLocalizationService extends CompositeService break; default: LOG.info("Unknown status: " + stat.getStatus()); - action = LocalizerAction.DIE; + fetchFailed = true; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, stat.getException().getMessage())); break; } } - if (action == LocalizerAction.DIE) { - response.setLocalizerAction(action); + if (fetchFailed || killContainerLocalizer.get()) { + response.setLocalizerAction(LocalizerAction.DIE); return response; } @@ -1029,12 +1061,9 @@ public class ResourceLocalizationService extends CompositeService } catch (URISyntaxException e) { //TODO fail? Already translated several times... } - } else if (pending.isEmpty()) { - // TODO: Synchronization - action = LocalizerAction.DIE; } - response.setLocalizerAction(action); + response.setLocalizerAction(LocalizerAction.LIVE); response.setResourceSpecs(rsrcs); return response; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java index 5134349..4785fba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java @@ -23,4 +23,5 @@ public enum LocalizationEventType { CACHE_CLEANUP, CLEANUP_CONTAINER_RESOURCES, DESTROY_APPLICATION_RESOURCES, + CONTAINER_RESOURCES_LOCALIZED, } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index d3c3521..2edaf45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -975,7 +976,8 @@ public class TestResourceLocalizationService { .thenReturn(Collections.<LocalResourceStatus>emptyList()) .thenReturn(Collections.singletonList(rsrc1success)) .thenReturn(Collections.singletonList(rsrc2pending)) - .thenReturn(rsrcs4); + .thenReturn(rsrcs4) + .thenReturn(Collections.<LocalResourceStatus>emptyList()); String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR + "user0" + Path.SEPARATOR + @@ -1019,7 +1021,13 @@ public class TestResourceLocalizationService { assertTrue(localizedPath.getFile().endsWith( localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12")); - // get shutdown + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + + spyService.handle(new ContainerLocalizationEvent( + LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c)); + + // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction());