This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 633891080 [GOBBLIN-1822]Logging Abnormal Helix Task States (#3685)
633891080 is described below

commit 633891080536b39f8c9c127af932d7e8b601b79e
Author: Zihan Li <[email protected]>
AuthorDate: Wed Apr 26 10:05:26 2023 -0700

    [GOBBLIN-1822]Logging Abnormal Helix Task States (#3685)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * Revert "[GOBBLIN-1810] Support general iceberg catalog in 
icebergMetadataWriter"
    
    This reverts commit b0844e8d7740b9eaa21132604f532a964c3f9e52.
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * add java doc
    
    * support configurable behavior to use custom metadata retention policy in 
iceberg writer
    
    * [GOBBLIN-1822] Logging Abnormal Helix Task States
    
    * address comments to clean code and add log for tasks that retried 
multiple times
    
    * address comments to improve code readability
    
    * remove unused import
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../gobblin/yarn/YarnAutoScalingManager.java       | 26 +++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 4ed63b744..5f5c872c6 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+
+import org.apache.commons.compress.utils.Sets;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -38,6 +40,7 @@ import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
@@ -68,6 +71,7 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final String AUTO_SCALING_PREFIX = 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
   private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
       AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
+  private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20;
   private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
   // Only one container will be requested for each N partitions of work
   private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = 
AUTO_SCALING_PREFIX + "partitionsPerContainer";
@@ -94,6 +98,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final double overProvisionFactor;
   private final SlidingWindowReservoir slidingFixedSizeWindow;
   private static int maxIdleTimeInMinutesBeforeScalingDown = 
DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
+  private static final HashSet<TaskPartitionState>
+      UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, 
TaskPartitionState.DROPPED);
 
   public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
     this.config = appMaster.getConfig();
@@ -189,6 +195,21 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
           .keySet().stream().filter(x -> filterString.isEmpty() || 
x.contains(filterString)).collect(Collectors.toSet());
     }
 
+    private String getInuseParticipantForHelixPartition(JobContext jobContext, 
int partition) {
+      if (jobContext.getPartitionNumAttempts(partition) > 
THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
+        log.warn("Helix task {} has been retried for {} times, please check 
the config to see how we can handle this task better",
+            jobContext.getTaskIdForPartition(partition), 
jobContext.getPartitionNumAttempts(partition));
+      }
+      if 
(!UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(partition))) {
+        return jobContext.getAssignedParticipant(partition);
+      }
+      // adding log here now for debugging
+      //todo: if this happens frequently, we should reset to status to 
retriable or at least report the error earlier
+      log.info("Helix task {} is in {} state which is unexpected, please watch 
out to see if this get recovered",
+          jobContext.getTaskIdForPartition(partition), 
jobContext.getPartitionState(partition));
+      return null;
+    }
+
     /**
      * Iterate through the workflows configured in Helix to figure out the 
number of required partitions
      * and request the {@link YarnService} to scale to the desired number of 
containers.
@@ -220,7 +241,10 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
           int numPartitions = 0;
           String jobTag = defaultHelixInstanceTags;
           if (jobContext != null) {
-            
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
+            log.debug("JobContext {} num partitions {}", jobContext, 
jobContext.getPartitionSet().size());
+
+            inUseInstances.addAll(jobContext.getPartitionSet().stream()
+                .map(i -> getInuseParticipantForHelixPartition(jobContext, i))
                 .filter(Objects::nonNull).collect(Collectors.toSet()));
 
             numPartitions = jobContext.getPartitionSet().size();

Reply via email to