YARN-3464. Race condition in LocalizerRunner kills localizer before localizing 
all resources. (Zhihai Xu via kasha)

(cherry picked from commit 47279c3228185548ed09c36579b420225e4894f5)
(cherry picked from commit 4045c41afe440b773d006e962bf8a5eae3fdc284)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ddcc7e5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ddcc7e5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ddcc7e5

Branch: refs/heads/branch-2.7
Commit: 4ddcc7e5b5b6d7d01d6dc3c79fa330d6a44e59ba
Parents: bec78f9
Author: Karthik Kambatla <ka...@apache.org>
Authored: Sun Apr 26 09:13:46 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Mon Apr 27 13:37:06 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  2 +
 .../container/ContainerImpl.java                |  8 ++-
 .../localizer/ResourceLocalizationService.java  | 53 +++++++++++++++-----
 .../localizer/event/LocalizationEventType.java  |  1 +
 .../TestResourceLocalizationService.java        | 12 ++++-
 5 files changed, 61 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5760f34..4f55003 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -41,6 +41,8 @@ Release 2.7.1 - UNRELEASED
     YARN-3516. killing ContainerLocalizer action doesn't take effect when
     private localizer receives FETCH_FAILURE status.(zhihai xu via xgong)
 
+    YARN-3464. Race condition in LocalizerRunner kills localizer before 
+    localizing all resources. (Zhihai Xu via kasha)
 
 Release 2.7.0 - 2015-04-20
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 131d439..f55e0e5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -59,7 +59,9 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -715,7 +717,12 @@ public class ContainerImpl implements Container {
         return ContainerState.LOCALIZING;
       }
 
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizationEvent(LocalizationEventType.
+              CONTAINER_RESOURCES_LOCALIZED, container));
+
       container.sendLaunchEvent();
+      container.metrics.endInitingContainer();
 
       // If this is a recovered container that has already launched, skip
       // uploading resources to the shared cache. We do this to avoid uploading
@@ -733,7 +740,6 @@ public class ContainerImpl implements Container {
                 SharedCacheUploadEventType.UPLOAD));
       }
 
-      container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index bb05946..4fb0ed7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -108,6 +109,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -389,6 +391,9 @@ public class ResourceLocalizationService extends 
CompositeService
     case INIT_CONTAINER_RESOURCES:
       handleInitContainerResources((ContainerLocalizationRequestEvent) event);
       break;
+    case CONTAINER_RESOURCES_LOCALIZED:
+      handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
+      break;
     case CACHE_CLEANUP:
       handleCacheCleanup(event);
       break;
@@ -451,7 +456,18 @@ public class ResourceLocalizationService extends 
CompositeService
       }
     }
   }
-  
+
+  /**
+   * Once a container's resources are localized, kill the corresponding
+   * {@link ContainerLocalizer}
+   */
+  private void handleContainerResourcesLocalized(
+      ContainerLocalizationEvent event) {
+    Container c = event.getContainer();
+    String locId = ConverterUtils.toString(c.getContainerId());
+    localizerTracker.endContainerLocalization(locId);
+  }
+
   private void handleCacheCleanup(LocalizationEvent event) {
     ResourceRetentionSet retain =
       new ResourceRetentionSet(delService, cacheTargetSize);
@@ -662,7 +678,7 @@ public class ResourceLocalizationService extends 
CompositeService
           response.setLocalizerAction(LocalizerAction.DIE);
           return response;
         }
-        return localizer.update(status.getResources());
+        return localizer.processHeartbeat(status.getResources());
       }
     }
     
@@ -716,6 +732,17 @@ public class ResourceLocalizationService extends 
CompositeService
         localizer.interrupt();
       }
     }
+
+    public void endContainerLocalization(String locId) {
+      LocalizerRunner localizer;
+      synchronized (privLocalizers) {
+        localizer = privLocalizers.get(locId);
+        if (null == localizer) {
+          return; // ignore
+        }
+      }
+      localizer.endContainerLocalization();
+    }
   }
   
 
@@ -870,6 +897,7 @@ public class ResourceLocalizationService extends 
CompositeService
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
     // Its a shared list between Private Localizer and dispatcher thread.
     final List<LocalizerResourceRequestEvent> pending;
+    private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);
 
     // TODO: threadsafe, use outer?
     private final RecordFactory recordFactory =
@@ -890,6 +918,10 @@ public class ResourceLocalizationService extends 
CompositeService
       pending.add(request);
     }
 
+    public void endContainerLocalization() {
+      killContainerLocalizer.set(true);
+    }
+
     /**
      * Find next resource to be given to a spawned localizer.
      * 
@@ -936,7 +968,7 @@ public class ResourceLocalizationService extends 
CompositeService
       }
     }
 
-    LocalizerHeartbeatResponse update(
+    LocalizerHeartbeatResponse processHeartbeat(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
@@ -945,7 +977,7 @@ public class ResourceLocalizationService extends 
CompositeService
       ApplicationId applicationId =
           
context.getContainerId().getApplicationAttemptId().getApplicationId();
 
-      LocalizerAction action = LocalizerAction.LIVE;
+      boolean fetchFailed = false;
       // Update resource statuses.
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
@@ -981,7 +1013,7 @@ public class ResourceLocalizationService extends 
CompositeService
           case FETCH_FAILURE:
             final String diagnostics = stat.getException().toString();
             LOG.warn(req + " failed: " + diagnostics);
-            action = LocalizerAction.DIE;
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, diagnostics));
@@ -993,15 +1025,15 @@ public class ResourceLocalizationService extends 
CompositeService
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
-            action = LocalizerAction.DIE;
+            fetchFailed = true;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, stat.getException().getMessage()));
             break;
         }
       }
-      if (action == LocalizerAction.DIE) {
-        response.setLocalizerAction(action);
+      if (fetchFailed || killContainerLocalizer.get()) {
+        response.setLocalizerAction(LocalizerAction.DIE);
         return response;
       }
 
@@ -1029,12 +1061,9 @@ public class ResourceLocalizationService extends 
CompositeService
         } catch (URISyntaxException e) {
             //TODO fail? Already translated several times...
         }
-      } else if (pending.isEmpty()) {
-        // TODO: Synchronization
-        action = LocalizerAction.DIE;
       }
 
-      response.setLocalizerAction(action);
+      response.setLocalizerAction(LocalizerAction.LIVE);
       response.setResourceSpecs(rsrcs);
       return response;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
index 5134349..4785fba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
@@ -23,4 +23,5 @@ public enum LocalizationEventType {
   CACHE_CLEANUP,
   CLEANUP_CONTAINER_RESOURCES,
   DESTROY_APPLICATION_RESOURCES,
+  CONTAINER_RESOURCES_LOCALIZED,
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ddcc7e5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index d3c3521..2edaf45 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -125,6 +125,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -975,7 +976,8 @@ public class TestResourceLocalizationService {
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
         .thenReturn(Collections.singletonList(rsrc1success))
         .thenReturn(Collections.singletonList(rsrc2pending))
-        .thenReturn(rsrcs4);
+        .thenReturn(rsrcs4)
+        .thenReturn(Collections.<LocalResourceStatus>emptyList());
 
       String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
           Path.SEPARATOR + "user0" + Path.SEPARATOR +
@@ -1019,7 +1021,13 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
           localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
 
-      // get shutdown
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+      spyService.handle(new ContainerLocalizationEvent(
+          LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));
+
+      // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
 

Reply via email to