This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2228f27  [Gobblin-1069][GOBBLIN-1069] Add NPE check in 
handleContainerCompletion method
2228f27 is described below

commit 2228f2783f760cac24e9c6a66e35ee517316504c
Author: Zihan Li <[email protected]>
AuthorDate: Fri Mar 6 16:46:17 2020 -0800

    [Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion 
method
    
    Closes #2911 from ZihanLi58/GOBBLIN-1069-new
---
 .../java/org/apache/gobblin/yarn/YarnService.java  | 60 ++++++++++------------
 1 file changed, 27 insertions(+), 33 deletions(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index ff1f3ba..6476c99 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -620,11 +620,7 @@ public class YarnService extends AbstractIdleService {
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
     Map.Entry<Container, String> completedContainerEntry = 
this.containerMap.remove(containerStatus.getContainerId());
-    if (completedContainerEntry == null) {
-      //No map for this container means we don't maintain this container, 
directly return
-      return;
-    }
-    String completedInstanceName = completedContainerEntry.getValue();
+    String completedInstanceName = completedContainerEntry == null? "unknown" 
: completedContainerEntry.getValue();
 
     LOGGER.info(String.format("Container %s running Helix instance %s has 
completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, 
containerStatus.getExitStatus()));
@@ -643,42 +639,40 @@ public class YarnService extends AbstractIdleService {
     if (this.shutdownInProgress) {
       return;
     }
+    if(completedContainerEntry != null) {
+      this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new 
AtomicInteger(0));
+      int retryCount = 
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
 
-    this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new 
AtomicInteger(0));
-    int retryCount =
-        
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
-
-    // Populate event metadata
-    Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = 
Optional.absent();
-    if (this.eventSubmitter.isPresent()) {
-      eventMetadataBuilder = 
Optional.of(buildContainerStatusEventMetadata(containerStatus));
-      
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID,
 completedInstanceName);
-      
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT,
 retryCount + "");
-    }
-
-    if (this.helixInstanceMaxRetries > 0 && retryCount > 
this.helixInstanceMaxRetries) {
+      // Populate event metadata
+      Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = 
Optional.absent();
       if (this.eventSubmitter.isPresent()) {
-        
this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
-            eventMetadataBuilder.get().build());
+        eventMetadataBuilder = 
Optional.of(buildContainerStatusEventMetadata(containerStatus));
+        
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID,
 completedInstanceName);
+        
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT,
 retryCount + "");
       }
 
-      LOGGER.warn("Maximum number of retries has been achieved for Helix 
instance " + completedInstanceName);
-      return;
-    }
+      if (this.helixInstanceMaxRetries > 0 && retryCount > 
this.helixInstanceMaxRetries) {
+        if (this.eventSubmitter.isPresent()) {
+          this.eventSubmitter.get()
+              
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, 
eventMetadataBuilder.get().build());
+        }
 
-    // Add the Helix instance name of the completed container to the queue of 
unused
-    // instance names so they can be reused by a replacement container.
-    this.unusedHelixInstanceNames.offer(completedInstanceName);
+        LOGGER.warn("Maximum number of retries has been achieved for Helix 
instance " + completedInstanceName);
+        return;
+      }
 
-    if (this.eventSubmitter.isPresent()) {
-      
this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
-          eventMetadataBuilder.get().build());
-    }
+      // Add the Helix instance name of the completed container to the queue 
of unused
+      // instance names so they can be reused by a replacement container.
+      this.unusedHelixInstanceNames.offer(completedInstanceName);
 
-    LOGGER.info(String.format("Requesting a new container to replace %s to run 
Helix instance %s",
-        containerStatus.getContainerId(), completedInstanceName));
+      if (this.eventSubmitter.isPresent()) {
+        this.eventSubmitter.get()
+            
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, 
eventMetadataBuilder.get().build());
+      }
+    }
+    LOGGER.info(String.format("Requesting a new container to replace %s to run 
Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
     this.eventBus.post(new NewContainerRequest(
-        shouldStickToTheSameNode(containerStatus.getExitStatus()) ?
+        shouldStickToTheSameNode(containerStatus.getExitStatus()) && 
completedContainerEntry != null ?
             Optional.of(completedContainerEntry.getKey()) : 
Optional.<Container>absent()));
   }
 

Reply via email to