This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 633891080 [GOBBLIN-1822]Logging Abnormal Helix Task States (#3685)
633891080 is described below
commit 633891080536b39f8c9c127af932d7e8b601b79e
Author: Zihan Li <[email protected]>
AuthorDate: Wed Apr 26 10:05:26 2023 -0700
[GOBBLIN-1822]Logging Abnormal Helix Task States (#3685)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
* Revert "[GOBBLIN-1810] Support general iceberg catalog in
icebergMetadataWriter"
This reverts commit b0844e8d7740b9eaa21132604f532a964c3f9e52.
* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
* add java doc
* support configurable behavior to use custom metadata retention policy in
iceberg writer
* [GOBBLIN-1822] Logging Abnormal Helix Task States
* address comments to clean code and add log for tasks that retried
multiple times
* address comments to improve code readability
* remove unused import
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../gobblin/yarn/YarnAutoScalingManager.java | 26 +++++++++++++++++++++-
1 file changed, 25 insertions(+), 1 deletion(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 4ed63b744..5f5c872c6 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
+import org.apache.commons.compress.utils.Sets;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -38,6 +40,7 @@ import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
@@ -68,6 +71,7 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final String AUTO_SCALING_PREFIX =
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
+ private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20;
private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
// Only one container will be requested for each N partitions of work
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER =
AUTO_SCALING_PREFIX + "partitionsPerContainer";
@@ -94,6 +98,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingFixedSizeWindow;
private static int maxIdleTimeInMinutesBeforeScalingDown =
DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
+ private static final HashSet<TaskPartitionState>
+ UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR,
TaskPartitionState.DROPPED);
public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
this.config = appMaster.getConfig();
@@ -189,6 +195,21 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
.keySet().stream().filter(x -> filterString.isEmpty() ||
x.contains(filterString)).collect(Collectors.toSet());
}
+ private String getInuseParticipantForHelixPartition(JobContext jobContext,
int partition) {
+ if (jobContext.getPartitionNumAttempts(partition) >
THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) {
+ log.warn("Helix task {} has been retried for {} times, please check
the config to see how we can handle this task better",
+ jobContext.getTaskIdForPartition(partition),
jobContext.getPartitionNumAttempts(partition));
+ }
+ if
(!UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(partition))) {
+ return jobContext.getAssignedParticipant(partition);
+ }
+ // adding log here now for debugging
+ //todo: if this happens frequently, we should reset to status to
retriable or at least report the error earlier
+ log.info("Helix task {} is in {} state which is unexpected, please watch
out to see if this get recovered",
+ jobContext.getTaskIdForPartition(partition),
jobContext.getPartitionState(partition));
+ return null;
+ }
+
/**
* Iterate through the workflows configured in Helix to figure out the
number of required partitions
* and request the {@link YarnService} to scale to the desired number of
containers.
@@ -220,7 +241,10 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
int numPartitions = 0;
String jobTag = defaultHelixInstanceTags;
if (jobContext != null) {
-
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
+ log.debug("JobContext {} num partitions {}", jobContext,
jobContext.getPartitionSet().size());
+
+ inUseInstances.addAll(jobContext.getPartitionSet().stream()
+ .map(i -> getInuseParticipantForHelixPartition(jobContext, i))
.filter(Objects::nonNull).collect(Collectors.toSet()));
numPartitions = jobContext.getPartitionSet().size();