This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c358719  [GOBBLIN-766] Emit WorkUnitsCreated Count Event for MR 
deployed jobs.
c358719 is described below

commit c35871920cc5f8378a4f9962658b840994f071f5
Author: krraman <[email protected]>
AuthorDate: Wed Sep 11 16:30:36 2019 -0700

    [GOBBLIN-766] Emit WorkUnitsCreated Count Event for MR deployed jobs.
    
    Closes #2706 from krishraman/master
---
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  6 ++
 .../resources/GobblinHelixJobLauncherTest.conf     |  1 +
 gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md    |  1 +
 .../org/apache/gobblin/metrics/event/JobEvent.java |  1 +
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 75 +++++++++-------------
 .../gobblin/runtime/local/LocalJobLauncher.java    | 10 ++-
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   |  7 ++
 7 files changed, 55 insertions(+), 46 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index d37dab1..f3c63f9 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -199,6 +201,10 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   @Override
   protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
     try {
+      CountEventBuilder countEventBuilder = new 
CountEventBuilder(JobEvent.WORK_UNITS_CREATED, workUnits.size());
+      this.eventSubmitter.submit(countEventBuilder);
+      LOGGER.info("Emitting WorkUnitsCreated Count: " + 
countEventBuilder.getCount());
+
       long workUnitStartTime = System.currentTimeMillis();
       workUnits.forEach((k) -> 
k.setProp(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, 
workUnitStartTime));
 
diff --git 
a/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf 
b/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf
index 6251c3d..69c67cd 100644
--- a/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf
+++ b/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf
@@ -31,3 +31,4 @@ writer.file.name="foo.avro"
 writer.file.path=avro
 writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
 source.schema="{\"namespace\":\"example.avro\", \"type\":\"record\", 
\"name\":\"User\", \"fields\":[{\"name\":\"name\", \"type\":\"string\"}, 
{\"name\":\"favorite_number\",  \"type\":\"int\"}, 
{\"name\":\"favorite_color\", \"type\":\"string\"}]}"
+metrics.enabled=true
\ No newline at end of file
diff --git a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md 
b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
index 3ece6c7..5388c7b 100644
--- a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
+++ b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
@@ -93,6 +93,7 @@ Job Progression Events
 * LockInUse: emitted if a job fails because it fails to get a lock.
 * WorkUnitsMissing: emitted if a job exits because source failed to get work 
units.
 * WorkUnitsEmpty: emitted if a job exits because there were no work units to 
process.
+* WorkUnitsCreated: emitted when workunits are created for a task. Metadata: 
workUnitsCreated(Number of bin-packed workunits created).
 * TasksSubmitted: emitted when tasks are submitted for execution. Metadata: 
tasksCount(number of tasks submitted).
 * TaskFailed: emitted when a task fails. Metadata: taskId(id of the failed 
task).
 * Job_Successful: emitted at the end of a successful job.
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
index 3f35b73..f8c5e49 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
@@ -31,6 +31,7 @@ public class JobEvent {
   public static final String LOCK_IN_USE = "LockInUse";
   public static final String WORK_UNITS_MISSING = "WorkUnitsMissing";
   public static final String WORK_UNITS_EMPTY = "WorkUnitsEmpty";
+  public static final String WORK_UNITS_CREATED = "WorkUnitsCreated";
   public static final String TASKS_SUBMITTED = "TasksSubmitted";
 
   public static final String METADATA_JOB_ID = "jobId";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 3fdf2d8..fc98beb 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -106,14 +106,9 @@ public class GobblinMultiTaskAttempt {
    */
   private List<CommitStep> cleanupCommitSteps;
 
-  public GobblinMultiTaskAttempt(Iterator<WorkUnit> workUnits,
-                                 String jobId,
-                                 JobState jobState,
-                                 TaskStateTracker taskStateTracker,
-                                 TaskExecutor taskExecutor,
-                                 Optional<String> containerIdOptional,
-                                 Optional<StateStore<TaskState>> 
taskStateStoreOptional,
-                                 SharedResourcesBroker<GobblinScopeTypes> 
jobBroker) {
+  public GobblinMultiTaskAttempt(Iterator<WorkUnit> workUnits, String jobId, 
JobState jobState,
+      TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, 
Optional<String> containerIdOptional,
+      Optional<StateStore<TaskState>> taskStateStoreOptional, 
SharedResourcesBroker<GobblinScopeTypes> jobBroker) {
     super();
     this.workUnits = workUnits;
     this.jobId = jobId;
@@ -123,8 +118,8 @@ public class GobblinMultiTaskAttempt {
     this.taskExecutor = taskExecutor;
     this.containerIdOptional = containerIdOptional;
     this.taskStateStoreOptional = taskStateStoreOptional;
-    this.log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() 
+ "-" +
-               containerIdOptional.or("noattempt"));
+    this.log =
+        LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-" 
+ containerIdOptional.or("noattempt"));
     this.jobBroker = jobBroker;
     this.tasks = new ArrayList<>();
   }
@@ -134,8 +129,7 @@ public class GobblinMultiTaskAttempt {
    * @throws IOException
    * @throws InterruptedException
    */
-  public void run()
-      throws IOException, InterruptedException {
+  public void run() throws IOException, InterruptedException {
     if (!this.workUnits.hasNext()) {
       log.warn("No work units to run in container " + 
containerIdOptional.or(""));
       return;
@@ -143,8 +137,7 @@ public class GobblinMultiTaskAttempt {
 
     CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
     this.tasks = runWorkUnits(countDownLatch);
-    log.info("Waiting for submitted tasks of job {} to complete in container 
{}...", jobId,
-        containerIdOptional.or(""));
+    log.info("Waiting for submitted tasks of job {} to complete in container 
{}...", jobId, containerIdOptional.or(""));
     try {
       while (countDownLatch.getCount() > 0) {
         if (this.interruptionPredicate.test(this)) {
@@ -183,8 +176,7 @@ public class GobblinMultiTaskAttempt {
    * 3. persist task statestore.
    * @throws IOException
    */
-  public void commit()
-      throws IOException {
+  public void commit() throws IOException {
     if (this.tasks == null || this.tasks.isEmpty()) {
       log.warn("No tasks to be committed in container " + 
containerIdOptional.or(""));
       return;
@@ -196,8 +188,7 @@ public class GobblinMultiTaskAttempt {
             return new Callable<Void>() {
               @Nullable
               @Override
-              public Void call()
-                  throws Exception {
+              public Void call() throws Exception {
                 task.commit();
                 return null;
               }
@@ -208,8 +199,8 @@ public class GobblinMultiTaskAttempt {
     try {
       List<Either<Void, ExecutionException>> executionResults =
           new IteratorExecutor<>(callableIterator, 
this.getTaskCommitThreadPoolSize(),
-              ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), 
Optional.of("Task-committing-pool-%d")))
-              .executeAndGetResults();
+              ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
+                  
Optional.of("Task-committing-pool-%d"))).executeAndGetResults();
       IteratorExecutor.logFailures(executionResults, log, 10);
     } catch (InterruptedException ie) {
       log.error("Committing of tasks interrupted. Aborting.");
@@ -229,10 +220,9 @@ public class GobblinMultiTaskAttempt {
    * A method that shuts down all running tasks managed by this instance.
    * TODO: Call this from the right place.
    */
-  public void shutdownTasks()
-      throws InterruptedException {
+  public void shutdownTasks() throws InterruptedException {
     log.info("Shutting down tasks");
-    for (Task task: this.tasks) {
+    for (Task task : this.tasks) {
       task.shutdown();
     }
 
@@ -249,8 +239,7 @@ public class GobblinMultiTaskAttempt {
     }
   }
 
-  private void persistTaskStateStore()
-      throws IOException {
+  private void persistTaskStateStore() throws IOException {
     if (!this.taskStateStoreOptional.isPresent()) {
       log.info("Task state store does not exist.");
       return;
@@ -286,8 +275,8 @@ public class GobblinMultiTaskAttempt {
 
         // If there are task failures then the tasks may be reattempted. Save 
a copy of the task state that is used
         // to filter out successful tasks on subsequent attempts.
-        if (task.getTaskState().getWorkingState() == 
WorkUnitState.WorkingState.SUCCESSFUL ||
-            task.getTaskState().getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED) {
+        if (task.getTaskState().getWorkingState() == 
WorkUnitState.WorkingState.SUCCESSFUL
+            || task.getTaskState().getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED) {
           taskStateStore.put(task.getJobId(), task.getTaskId() + 
TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX,
               task.getTaskState());
         }
@@ -417,8 +406,8 @@ public class GobblinMultiTaskAttempt {
       }
     }
 
-    new EventSubmitter.Builder(JobMetrics.get(this.jobId, new 
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(), "gobblin.runtime")
-        .build()
+    new EventSubmitter.Builder(JobMetrics.get(this.jobId, new 
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
+        "gobblin.runtime").build()
         .submit(JobEvent.TASKS_SUBMITTED, "tasksCount", 
Long.toString(countDownLatch.getRegisteredParties()));
 
     return tasks;
@@ -432,21 +421,20 @@ public class GobblinMultiTaskAttempt {
 
     this.log.info("Heap Memory");
     this.log.info(String.format(format, "init", "used", "Committed", "max"));
-    this.log.info(String.format(format, heapMemory.getInit(), 
heapMemory.getUsed(),
-        heapMemory.getCommitted(), heapMemory.getMax()));
+    this.log.info(String.format(format, heapMemory.getInit(), 
heapMemory.getUsed(), heapMemory.getCommitted(),
+        heapMemory.getMax()));
 
     this.log.info("Non-heap Memory");
     this.log.info(String.format(format, "init", "used", "Committed", "max"));
-    this.log.info(String.format(format, nonHeapMemory.getInit(), 
nonHeapMemory.getUsed(),
-        nonHeapMemory.getCommitted(), nonHeapMemory.getMax()));
+    this.log.info(String.format(format, nonHeapMemory.getInit(), 
nonHeapMemory.getUsed(), nonHeapMemory.getCommitted(),
+        nonHeapMemory.getMax()));
   }
 
-
   private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch 
countDownLatch) {
     Optional<TaskFactory> taskFactoryOpt = 
TaskUtils.getTaskFactory(workUnitState);
     if (taskFactoryOpt.isPresent()) {
-      return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(new 
TaskContext(workUnitState)), new TaskContext(workUnitState),
-          countDownLatch, this.taskStateTracker);
+      return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(new 
TaskContext(workUnitState)),
+          new TaskContext(workUnitState), countDownLatch, 
this.taskStateTracker);
     } else {
       return new Task(new TaskContext(workUnitState), this.taskStateTracker, 
this.taskExecutor,
           Optional.of(countDownLatch));
@@ -477,7 +465,7 @@ public class GobblinMultiTaskAttempt {
    * {@link JobMetrics#attemptRemove(String, Tag)}.
    */
   public void cleanMetrics() {
-    tasks.forEach(task-> {
+    tasks.forEach(task -> {
       TaskMetrics.remove(task);
       JobMetrics.attemptRemove(this.jobId, new 
JobMetrics.CreatorTag(task.getTaskId()));
     });
@@ -491,12 +479,12 @@ public class GobblinMultiTaskAttempt {
    * updating the task state.
    */
   public static GobblinMultiTaskAttempt runWorkUnits(JobContext jobContext, 
Iterator<WorkUnit> workUnits,
-      TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
-      CommitPolicy multiTaskAttemptCommitPolicy)
+      TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, 
CommitPolicy multiTaskAttemptCommitPolicy)
       throws IOException, InterruptedException {
     GobblinMultiTaskAttempt multiTaskAttempt =
-        new GobblinMultiTaskAttempt(workUnits, jobContext.getJobId(), 
jobContext.getJobState(), taskStateTracker, taskExecutor,
-            Optional.<String>absent(), 
Optional.<StateStore<TaskState>>absent(), jobContext.getJobBroker());
+        new GobblinMultiTaskAttempt(workUnits, jobContext.getJobId(), 
jobContext.getJobState(), taskStateTracker,
+            taskExecutor, Optional.<String>absent(), 
Optional.<StateStore<TaskState>>absent(),
+            jobContext.getJobBroker());
     
multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
     return multiTaskAttempt;
   }
@@ -519,9 +507,8 @@ public class GobblinMultiTaskAttempt {
    */
   public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String 
containerId, JobState jobState,
       List<WorkUnit> workUnits, TaskStateTracker taskStateTracker, 
TaskExecutor taskExecutor,
-      StateStore<TaskState> taskStateStore,
-      CommitPolicy multiTaskAttemptCommitPolicy, 
SharedResourcesBroker<GobblinScopeTypes> jobBroker,
-      Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
+      StateStore<TaskState> taskStateStore, CommitPolicy 
multiTaskAttemptCommitPolicy,
+      SharedResourcesBroker<GobblinScopeTypes> jobBroker, 
Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
       throws IOException, InterruptedException {
 
     // dump the work unit if tracking logs are enabled
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
index 6b6bc16..bcebb73 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
@@ -24,6 +24,8 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.runtime.job.JobInterruptionPredicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +72,8 @@ public class LocalJobLauncher extends AbstractJobLauncher {
   // Service manager to manage dependent services
   private final ServiceManager serviceManager;
 
+  private Integer workUnitCount;
+
   public LocalJobLauncher(Properties jobProps) throws Exception {
     this(jobProps, null, ImmutableList.of());
   }
@@ -124,18 +128,21 @@ public class LocalJobLauncher extends AbstractJobLauncher 
{
 
   @Override
   protected void runWorkUnitStream(WorkUnitStream workUnitStream) throws 
Exception {
+
     String jobId = this.jobContext.getJobId();
     final JobState jobState = this.jobContext.getJobState();
 
     Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
     if (!workUnitIterator.hasNext()) {
       LOG.warn("No work units to run");
+      CountEventBuilder countEventBuilder = new 
CountEventBuilder(JobEvent.WORK_UNITS_EMPTY, 0);
+      this.eventSubmitter.submit(countEventBuilder);
       return;
     }
 
-
     TimingEvent workUnitsRunTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.WORK_UNITS_RUN);
     Iterator<WorkUnit> flattenedWorkUnits = new 
MultiWorkUnitUnpackingIterator(workUnitStream.getWorkUnits());
+
     Iterator<WorkUnit> workUnitsWithJobState = 
Iterators.transform(flattenedWorkUnits, new Function<WorkUnit, WorkUnit>() {
       @Override
       public WorkUnit apply(WorkUnit workUnit) {
@@ -143,7 +150,6 @@ public class LocalJobLauncher extends AbstractJobLauncher {
         return workUnit;
       }
     });
-
     Thread thisThread = Thread.currentThread();
     JobInterruptionPredicate jobInterruptionPredicate =
         new JobInterruptionPredicate(jobState, () -> thisThread.interrupt(), 
true);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 0a2a284..0077d0f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -29,6 +29,9 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.gobblin.fsm.FiniteStateMachine;
 import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.metrics.event.JobStateEventBuilder;
 import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
 import org.apache.hadoop.conf.Configuration;
@@ -262,6 +265,10 @@ public class MRJobLauncher extends AbstractJobLauncher {
     JobState jobState = this.jobContext.getJobState();
 
     try {
+      CountEventBuilder countEventBuilder = new 
CountEventBuilder(JobEvent.WORK_UNITS_CREATED, workUnits.size());
+      this.eventSubmitter.submit(countEventBuilder);
+      LOG.info("Emitting WorkUnitsCreated Count: " + 
countEventBuilder.getCount());
+
       prepareHadoopJob(workUnits);
 
       // Start the output TaskState collector service

Reply via email to