This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 20f01ac0b Correct a log line and GTE with currect number of total task 
count (#3591)
20f01ac0b is described below

commit 20f01ac0beaa0791e397220d63e8e3623f44703a
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Nov 7 12:48:40 2022 -0600

    Correct a log line and GTE with currect number of total task count (#3591)
---
 .../org/apache/gobblin/runtime/CountUpAndDownLatch.java     | 13 +++++++++++--
 .../org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java | 10 ++++++----
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
index 120536d17..a14ff4f00 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
@@ -21,6 +21,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jetbrains.annotations.NotNull;
 
 
 /**
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeoutException;
 class CountUpAndDownLatch extends CountDownLatch {
 
   private final Phaser phaser;
+  AtomicLong totalParties = new AtomicLong();
 
   public CountUpAndDownLatch(int count) {
     super(0);
@@ -48,7 +51,7 @@ class CountUpAndDownLatch extends CountDownLatch {
   }
 
   @Override
-  public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException {
+  public boolean await(long timeout, @NotNull TimeUnit unit) throws 
InterruptedException {
     try {
       int phase = getPhase();
       this.phaser.awaitAdvanceInterruptibly(phase, timeout, unit);
@@ -71,6 +74,7 @@ class CountUpAndDownLatch extends CountDownLatch {
 
   public void countUp() {
     this.phaser.register();
+    totalParties.addAndGet(1);
   }
 
   @Override
@@ -78,12 +82,17 @@ class CountUpAndDownLatch extends CountDownLatch {
     return this.phaser.getUnarrivedParties();
   }
 
+  /**
+   * Because {@link #countDown()} de-registers a party. This method gives the 
same result as {@link #getCount()}.
+   * @return currently registered parties
+   */
+  @Deprecated
   public long getRegisteredParties() {
     return this.phaser.getRegisteredParties();
   }
 
   @Override
   public String toString() {
-    return "Unarrived parties: " + this.phaser.getUnarrivedParties();
+    return "Unarrived parties: " + this.phaser.getUnarrivedParties() + "/" + 
totalParties;
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 17162a68b..7ebb77d11 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -198,14 +198,16 @@ public class GobblinMultiTaskAttempt {
           interruptTaskExecution(countDownLatch);
           break;
         }
-        log.info(String.format("%d out of %d tasks of job %s are running in 
container %s", countDownLatch.getCount(),
-            countDownLatch.getRegisteredParties(), jobId, 
containerIdOptional.or("")));
+        long totalTasks = countDownLatch.totalParties.get();
+        long runningTasks = countDownLatch.getCount();
+        log.info(String.format("%d out of %d tasks of job %s are running in 
container %s. %d tasks finished.",
+            runningTasks, totalTasks, jobId, containerIdOptional.or(""), 
totalTasks - runningTasks));
         if (countDownLatch.await(10, TimeUnit.SECONDS)) {
           break;
         }
       }
     } catch (InterruptedException interrupt) {
-      log.info("Job interrupted by InterrupedException.");
+      log.info("Job interrupted by InterruptedException.");
       interruptTaskExecution(countDownLatch);
     }
     log.info("All assigned tasks of job {} have completed in container {}", 
jobId, containerIdOptional.or(""));
@@ -502,7 +504,7 @@ public class GobblinMultiTaskAttempt {
     EventSubmitter.Builder eventSubmitterBuilder = new 
EventSubmitter.Builder(JobMetrics.get(this.jobId, new 
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
         "gobblin.runtime");
     
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
 JobEvent.TASKS_SUBMITTED));
-    eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, 
"tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
+    eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, 
"tasksCount", Integer.toString(tasks.size()));
 
     return new Pair<>(tasks, areAllTasksSubmitted);
   }

Reply via email to