This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 20f01ac0b Correct a log line and GTE with currect number of total task
count (#3591)
20f01ac0b is described below
commit 20f01ac0beaa0791e397220d63e8e3623f44703a
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Nov 7 12:48:40 2022 -0600
Correct a log line and GTE with currect number of total task count (#3591)
---
.../org/apache/gobblin/runtime/CountUpAndDownLatch.java | 13 +++++++++++--
.../org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java | 10 ++++++----
2 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
index 120536d17..a14ff4f00 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
@@ -21,6 +21,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jetbrains.annotations.NotNull;
/**
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeoutException;
class CountUpAndDownLatch extends CountDownLatch {
private final Phaser phaser;
+ AtomicLong totalParties = new AtomicLong();
public CountUpAndDownLatch(int count) {
super(0);
@@ -48,7 +51,7 @@ class CountUpAndDownLatch extends CountDownLatch {
}
@Override
- public boolean await(long timeout, TimeUnit unit) throws
InterruptedException {
+ public boolean await(long timeout, @NotNull TimeUnit unit) throws
InterruptedException {
try {
int phase = getPhase();
this.phaser.awaitAdvanceInterruptibly(phase, timeout, unit);
@@ -71,6 +74,7 @@ class CountUpAndDownLatch extends CountDownLatch {
public void countUp() {
this.phaser.register();
+ totalParties.addAndGet(1);
}
@Override
@@ -78,12 +82,17 @@ class CountUpAndDownLatch extends CountDownLatch {
return this.phaser.getUnarrivedParties();
}
+ /**
+ * Because {@link #countDown()} de-registers a party. This method gives the
same result as {@link #getCount()}.
+ * @return currently registered parties
+ */
+ @Deprecated
public long getRegisteredParties() {
return this.phaser.getRegisteredParties();
}
@Override
public String toString() {
- return "Unarrived parties: " + this.phaser.getUnarrivedParties();
+ return "Unarrived parties: " + this.phaser.getUnarrivedParties() + "/" +
totalParties;
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 17162a68b..7ebb77d11 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -198,14 +198,16 @@ public class GobblinMultiTaskAttempt {
interruptTaskExecution(countDownLatch);
break;
}
- log.info(String.format("%d out of %d tasks of job %s are running in
container %s", countDownLatch.getCount(),
- countDownLatch.getRegisteredParties(), jobId,
containerIdOptional.or("")));
+ long totalTasks = countDownLatch.totalParties.get();
+ long runningTasks = countDownLatch.getCount();
+ log.info(String.format("%d out of %d tasks of job %s are running in
container %s. %d tasks finished.",
+ runningTasks, totalTasks, jobId, containerIdOptional.or(""),
totalTasks - runningTasks));
if (countDownLatch.await(10, TimeUnit.SECONDS)) {
break;
}
}
} catch (InterruptedException interrupt) {
- log.info("Job interrupted by InterrupedException.");
+ log.info("Job interrupted by InterruptedException.");
interruptTaskExecution(countDownLatch);
}
log.info("All assigned tasks of job {} have completed in container {}",
jobId, containerIdOptional.or(""));
@@ -502,7 +504,7 @@ public class GobblinMultiTaskAttempt {
EventSubmitter.Builder eventSubmitterBuilder = new
EventSubmitter.Builder(JobMetrics.get(this.jobId, new
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
"gobblin.runtime");
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState,
JobEvent.TASKS_SUBMITTED));
- eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED,
"tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
+ eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED,
"tasksCount", Integer.toString(tasks.size()));
return new Pair<>(tasks, areAllTasksSubmitted);
}