This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2228f27 [Gobblin-1069][GOBBLIN-1069] Add NPE check in
handleContainerCompletion method
2228f27 is described below
commit 2228f2783f760cac24e9c6a66e35ee517316504c
Author: Zihan Li <[email protected]>
AuthorDate: Fri Mar 6 16:46:17 2020 -0800
[Gobblin-1069][GOBBLIN-1069] Add NPE check in handleContainerCompletion
method
Closes #2911 from ZihanLi58/GOBBLIN-1069-new
---
.../java/org/apache/gobblin/yarn/YarnService.java | 60 ++++++++++------------
1 file changed, 27 insertions(+), 33 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index ff1f3ba..6476c99 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -620,11 +620,7 @@ public class YarnService extends AbstractIdleService {
*/
protected void handleContainerCompletion(ContainerStatus containerStatus) {
Map.Entry<Container, String> completedContainerEntry =
this.containerMap.remove(containerStatus.getContainerId());
- if (completedContainerEntry == null) {
- //No map for this container means we don't maintain this container,
directly return
- return;
- }
- String completedInstanceName = completedContainerEntry.getValue();
+ String completedInstanceName = completedContainerEntry == null? "unknown"
: completedContainerEntry.getValue();
LOGGER.info(String.format("Container %s running Helix instance %s has
completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName,
containerStatus.getExitStatus()));
@@ -643,42 +639,40 @@ public class YarnService extends AbstractIdleService {
if (this.shutdownInProgress) {
return;
}
+ if(completedContainerEntry != null) {
+ this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new
AtomicInteger(0));
+ int retryCount =
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
- this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new
AtomicInteger(0));
- int retryCount =
-
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
-
- // Populate event metadata
- Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder =
Optional.absent();
- if (this.eventSubmitter.isPresent()) {
- eventMetadataBuilder =
Optional.of(buildContainerStatusEventMetadata(containerStatus));
-
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID,
completedInstanceName);
-
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT,
retryCount + "");
- }
-
- if (this.helixInstanceMaxRetries > 0 && retryCount >
this.helixInstanceMaxRetries) {
+ // Populate event metadata
+ Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder =
Optional.absent();
if (this.eventSubmitter.isPresent()) {
-
this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
- eventMetadataBuilder.get().build());
+ eventMetadataBuilder =
Optional.of(buildContainerStatusEventMetadata(containerStatus));
+
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID,
completedInstanceName);
+
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT,
retryCount + "");
}
- LOGGER.warn("Maximum number of retries has been achieved for Helix
instance " + completedInstanceName);
- return;
- }
+ if (this.helixInstanceMaxRetries > 0 && retryCount >
this.helixInstanceMaxRetries) {
+ if (this.eventSubmitter.isPresent()) {
+ this.eventSubmitter.get()
+
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
eventMetadataBuilder.get().build());
+ }
- // Add the Helix instance name of the completed container to the queue of
unused
- // instance names so they can be reused by a replacement container.
- this.unusedHelixInstanceNames.offer(completedInstanceName);
+ LOGGER.warn("Maximum number of retries has been achieved for Helix
instance " + completedInstanceName);
+ return;
+ }
- if (this.eventSubmitter.isPresent()) {
-
this.eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
- eventMetadataBuilder.get().build());
- }
+ // Add the Helix instance name of the completed container to the queue
of unused
+ // instance names so they can be reused by a replacement container.
+ this.unusedHelixInstanceNames.offer(completedInstanceName);
- LOGGER.info(String.format("Requesting a new container to replace %s to run
Helix instance %s",
- containerStatus.getContainerId(), completedInstanceName));
+ if (this.eventSubmitter.isPresent()) {
+ this.eventSubmitter.get()
+
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
eventMetadataBuilder.get().build());
+ }
+ }
+ LOGGER.info(String.format("Requesting a new container to replace %s to run
Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
this.eventBus.post(new NewContainerRequest(
- shouldStickToTheSameNode(containerStatus.getExitStatus()) ?
+ shouldStickToTheSameNode(containerStatus.getExitStatus()) &&
completedContainerEntry != null ?
Optional.of(completedContainerEntry.getKey()) :
Optional.<Container>absent()));
}