This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3dd82ac02 [GOBBLIN-1819] Log helix workflow information and timeout
information during submission wait / polling (#3681)
3dd82ac02 is described below
commit 3dd82ac02e838521a53649daae06d776647e76ff
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Apr 24 11:32:41 2023 -0700
[GOBBLIN-1819] Log helix workflow information and timeout information
during submission wait / polling (#3681)
* [GOBBLIN-1819] Log helix workflow information and timeout information
during submission wait / polling
* Auto scaling manager logs
---
.../org/apache/gobblin/cluster/HelixUtils.java | 24 ++++++++++++----------
.../gobblin/yarn/YarnAutoScalingManager.java | 8 ++++----
.../java/org/apache/gobblin/yarn/YarnService.java | 6 ++++++
3 files changed, 23 insertions(+), 15 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 6efc6fff0..d62002967 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -120,10 +120,10 @@ public class HelixUtils {
static void waitJobInitialization(
HelixManager helixManager,
- String workFlowName,
+ String workflowName,
String jobName,
Duration timeout) throws Exception {
- WorkflowContext workflowContext =
TaskDriver.getWorkflowContext(helixManager, workFlowName);
+ WorkflowContext workflowContext =
TaskDriver.getWorkflowContext(helixManager, workflowName);
// If the helix job is deleted from some other thread or a completely
external process,
// method waitJobCompletion() needs to differentiate between the cases
where
@@ -131,18 +131,20 @@ public class HelixUtils {
// 2) it did get initialized but deleted soon after, in which case we
should stop waiting
// To overcome this issue, we wait here till workflowContext gets
initialized
long start = System.currentTimeMillis();
- while (workflowContext == null ||
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName,
jobName)) == null) {
+ while (workflowContext == null ||
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workflowName,
jobName)) == null) {
if (System.currentTimeMillis() - start > timeout.toMillis()) {
- log.error("Job cannot be initialized within {} milliseconds,
considered as an error", timeout.toMillis());
- throw new JobException(String.format("Job cannot be initialized within
%s milliseconds, considered as an error",
- timeout.toMillis()));
+ String errorDescription = String.format("Job cannot be initialized
within %s milliseconds, considered as an error. "
+ + "workflowName=%s, jobName=%s, timeSubmittedEpoch=%s",
timeout.toMillis(), workflowName, jobName, start);
+ log.error(errorDescription);
+ throw new JobException(errorDescription);
}
- workflowContext = TaskDriver.getWorkflowContext(helixManager,
workFlowName);
+ workflowContext = TaskDriver.getWorkflowContext(helixManager,
workflowName);
Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
- log.info("Waiting for work flow initialization.");
+ log.info("Waiting for workflow initialization. workflowName={},
jobName={}, timeSubmittedEpoch={}, timeoutSeconds={}",
+ workflowName, jobName, start, timeout.getSeconds());
}
- log.info("Work flow {} initialized", workFlowName);
+ log.info("Workflow {} initialized. timeToInitMs={}", workflowName,
System.currentTimeMillis() - start);
}
/**
@@ -235,11 +237,11 @@ public class HelixUtils {
Duration submissionTimeout) throws Exception {
WorkflowConfig workFlowConfig = new
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime.getSeconds(),
TimeUnit.SECONDS).build();
- // Create a work flow for each job with the name being the queue name
+ // Create a workflow for each Gobblin job using the Gobblin job name as
the workflow name
Workflow workFlow = new
Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName,
jobConfigBuilder).build();
// start the workflow
helixTaskDriver.start(workFlow);
- log.info("Created a work flow {}", workFlowName);
+ log.info("Created a workflow {}", workFlowName);
waitJobInitialization(helixManager, workFlowName, jobName,
submissionTimeout);
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 7c4da8fd8..4ed63b744 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.yarn;
-import com.google.common.base.Strings;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.HashMap;
@@ -31,7 +30,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -47,12 +45,14 @@ import org.apache.helix.task.WorkflowContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -220,8 +220,6 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
int numPartitions = 0;
String jobTag = defaultHelixInstanceTags;
if (jobContext != null) {
- log.debug("JobContext {} num partitions {}", jobContext,
jobContext.getPartitionSet().size());
-
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
.filter(Objects::nonNull).collect(Collectors.toSet()));
@@ -244,6 +242,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
// per partition. Scale the result by a constant overprovision
factor.
int containerCount = (int) Math.ceil(((double)numPartitions /
this.partitionsPerContainer) * this.overProvisionFactor);
yarnContainerRequestBundle.add(jobTag, containerCount, resource);
+ log.info("jobName={}, jobTag={}, numPartitions={},
targetNumContainers={}",
+ jobName, jobTag, numPartitions, containerCount);
}
}
// Find all participants appearing in this cluster. Note that Helix
instances can contain cluster-manager
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 5b6610f81..fe0d93d2d 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -1109,5 +1109,11 @@ public class YarnService extends AbstractIdleService {
this.helixTag = helixTag;
this.startupCommand = YarnService.this.buildContainerCommand(container,
helixParticipantId, helixTag);
}
+
+ @Override
+ public String toString() {
+ return String.format("ContainerInfo{ container=%s,
helixParticipantId=%s, helixTag=%s, startupCommand=%s }",
+ container.getId(), helixParticipantId, helixTag, startupCommand);
+ }
}
}