This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dd82ac02 [GOBBLIN-1819] Log helix workflow information and timeout 
information during submission wait / polling (#3681)
3dd82ac02 is described below

commit 3dd82ac02e838521a53649daae06d776647e76ff
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Apr 24 11:32:41 2023 -0700

    [GOBBLIN-1819] Log helix workflow information and timeout information 
during submission wait / polling (#3681)
    
    * [GOBBLIN-1819] Log helix workflow information and timeout information 
during submission wait / polling
    
    * Auto scaling manager logs
---
 .../org/apache/gobblin/cluster/HelixUtils.java     | 24 ++++++++++++----------
 .../gobblin/yarn/YarnAutoScalingManager.java       |  8 ++++----
 .../java/org/apache/gobblin/yarn/YarnService.java  |  6 ++++++
 3 files changed, 23 insertions(+), 15 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 6efc6fff0..d62002967 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -120,10 +120,10 @@ public class HelixUtils {
 
   static void waitJobInitialization(
       HelixManager helixManager,
-      String workFlowName,
+      String workflowName,
       String jobName,
       Duration timeout) throws Exception {
-    WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
+    WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workflowName);
 
     // If the helix job is deleted from some other thread or a completely 
external process,
     // method waitJobCompletion() needs to differentiate between the cases 
where
@@ -131,18 +131,20 @@ public class HelixUtils {
     // 2) it did get initialized but deleted soon after, in which case we 
should stop waiting
     // To overcome this issue, we wait here till workflowContext gets 
initialized
     long start = System.currentTimeMillis();
-    while (workflowContext == null || 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, 
jobName)) == null) {
+    while (workflowContext == null || 
workflowContext.getJobState(TaskUtil.getNamespacedJobName(workflowName, 
jobName)) == null) {
       if (System.currentTimeMillis() - start > timeout.toMillis()) {
-        log.error("Job cannot be initialized within {} milliseconds, 
considered as an error", timeout.toMillis());
-        throw new JobException(String.format("Job cannot be initialized within 
%s milliseconds, considered as an error",
-            timeout.toMillis()));
+        String errorDescription = String.format("Job cannot be initialized 
within %s milliseconds, considered as an error. "
+                + "workflowName=%s, jobName=%s, timeSubmittedEpoch=%s", 
timeout.toMillis(), workflowName, jobName, start);
+        log.error(errorDescription);
+        throw new JobException(errorDescription);
       }
-      workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workFlowName);
+      workflowContext = TaskDriver.getWorkflowContext(helixManager, 
workflowName);
       Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
-      log.info("Waiting for work flow initialization.");
+      log.info("Waiting for workflow initialization. workflowName={}, 
jobName={}, timeSubmittedEpoch={}, timeoutSeconds={}",
+          workflowName, jobName, start, timeout.getSeconds());
     }
 
-    log.info("Work flow {} initialized", workFlowName);
+    log.info("Workflow {} initialized. timeToInitMs={}", workflowName, 
System.currentTimeMillis() - start);
   }
 
   /**
@@ -235,11 +237,11 @@ public class HelixUtils {
       Duration submissionTimeout) throws Exception {
 
     WorkflowConfig workFlowConfig = new 
WorkflowConfig.Builder().setExpiry(workFlowExpiryTime.getSeconds(), 
TimeUnit.SECONDS).build();
-    // Create a work flow for each job with the name being the queue name
+    // Create a workflow for each Gobblin job using the Gobblin job name as 
the workflow name
     Workflow workFlow = new 
Workflow.Builder(workFlowName).setWorkflowConfig(workFlowConfig).addJob(jobName,
 jobConfigBuilder).build();
     // start the workflow
     helixTaskDriver.start(workFlow);
-    log.info("Created a work flow {}", workFlowName);
+    log.info("Created a workflow {}", workFlowName);
 
     waitJobInitialization(helixManager, workFlowName, jobName, 
submissionTimeout);
   }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 7c4da8fd8..4ed63b744 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.yarn;
 
-import com.google.common.base.Strings;
 import java.util.ArrayDeque;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -31,7 +30,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -47,12 +45,14 @@ import org.apache.helix.task.WorkflowContext;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 
@@ -220,8 +220,6 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
           int numPartitions = 0;
           String jobTag = defaultHelixInstanceTags;
           if (jobContext != null) {
-            log.debug("JobContext {} num partitions {}", jobContext, 
jobContext.getPartitionSet().size());
-
             
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
                 .filter(Objects::nonNull).collect(Collectors.toSet()));
 
@@ -244,6 +242,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
           // per partition. Scale the result by a constant overprovision 
factor.
           int containerCount = (int) Math.ceil(((double)numPartitions / 
this.partitionsPerContainer) * this.overProvisionFactor);
           yarnContainerRequestBundle.add(jobTag, containerCount, resource);
+          log.info("jobName={}, jobTag={}, numPartitions={}, 
targetNumContainers={}",
+              jobName, jobTag, numPartitions, containerCount);
         }
       }
       // Find all participants appearing in this cluster. Note that Helix 
instances can contain cluster-manager
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 5b6610f81..fe0d93d2d 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -1109,5 +1109,11 @@ public class YarnService extends AbstractIdleService {
       this.helixTag = helixTag;
       this.startupCommand = YarnService.this.buildContainerCommand(container, 
helixParticipantId, helixTag);
     }
+
+    @Override
+    public String toString() {
+      return String.format("ContainerInfo{ container=%s, 
helixParticipantId=%s, helixTag=%s, startupCommand=%s }",
+          container.getId(), helixParticipantId, helixTag, startupCommand);
+    }
   }
 }

Reply via email to