This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e5e0ff  [GOBBLIN-766] Emit Workunits Created event
7e5e0ff is described below

commit 7e5e0ff9eadc10206bacb90e4015ab834a7df2b4
Author: krraman <[email protected]>
AuthorDate: Tue May 28 13:48:26 2019 -0700

    [GOBBLIN-766] Emit Workunits Created event
    
    Emitting workunits created metric
    
    Adding workunit created event in gobblin metric
    docs
    
    Fix typo in metadata
    
    Emitting workunit created metric
    
    Fixing indents
    
    Update javadoc
    
    Trigger build
    
    Closes #2636 from krishraman/master
---
 gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md         |  1 +
 .../java/org/apache/gobblin/metrics/event/JobEvent.java |  1 +
 .../java/org/apache/gobblin/metrics/TaggedTest.java     | 17 +++++++++++------
 .../org/apache/gobblin/runtime/AbstractJobLauncher.java |  8 ++++++++
 4 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md 
b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
index 3ece6c7..4a2bfd3 100644
--- a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
+++ b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
@@ -93,6 +93,7 @@ Job Progression Events
 * LockInUse: emitted if a job fails because it fails to get a lock.
 * WorkUnitsMissing: emitted if a job exits because source failed to get work 
units.
 * WorkUnitsEmpty: emitted if a job exits because there were no work units to 
process.
+* WorkUnitsCreated: emitted when workunits are created for a task. Metadata: 
workUnitsCreated(number of bin-packed workunits created).
 * TasksSubmitted: emitted when tasks are submitted for execution. Metadata: 
tasksCount(number of tasks submitted).
 * TaskFailed: emitted when a task fails. Metadata: taskId(id of the failed 
task).
 * Job_Successful: emitted at the end of a successful job.
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
index d9310fb..55bf233 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
@@ -31,6 +31,7 @@ public class JobEvent {
   public static final String LOCK_IN_USE = "LockInUse";
   public static final String WORK_UNITS_MISSING = "WorkUnitsMissing";
   public static final String WORK_UNITS_EMPTY = "WorkUnitsEmpty";
+  public static final String WORK_UNITS_CREATED = "WorkUnitsCreated";
   public static final String TASKS_SUBMITTED = "TasksSubmitted";
 
   public static final String METADATA_JOB_ID = "jobId";
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
index 10c986e..eeb77a6 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
@@ -38,6 +38,8 @@ public class TaggedTest {
   private static final String JOB_ID = "TestJob-0";
   private static final String PROJECT_VERSION_KEY = "project.version";
   private static final int PROJECT_VERSION = 1;
+  private static final String WORK_UNITS_CREATED = "WorkUnitsCreated";
+  private static final int WORKUNITS = 5;
 
   private Tagged tagged;
 
@@ -50,24 +52,27 @@ public class TaggedTest {
   public void testAddTags() {
     this.tagged.addTag(new Tag<String>(JOB_ID_KEY, JOB_ID));
     this.tagged.addTag(new Tag<Integer>(PROJECT_VERSION_KEY, PROJECT_VERSION));
+    this.tagged.addTag(new Tag<Integer>(WORK_UNITS_CREATED, WORKUNITS));
   }
 
   @Test(dependsOnMethods = "testAddTags")
   public void testGetTags() {
     List<Tag<?>> tags = this.tagged.getTags();
-    Assert.assertEquals(tags.size(), 2);
+    Assert.assertEquals(tags.size(), 3);
     Assert.assertEquals(tags.get(0).getKey(), JOB_ID_KEY);
     Assert.assertEquals(tags.get(0).getValue(), JOB_ID);
     Assert.assertEquals(tags.get(1).getKey(), PROJECT_VERSION_KEY);
     Assert.assertEquals(tags.get(1).getValue(), PROJECT_VERSION);
+    Assert.assertEquals(tags.get(2).getKey(), WORK_UNITS_CREATED);
+    Assert.assertEquals(tags.get(2).getValue(), WORKUNITS);
   }
 
   @Test(dependsOnMethods = "testAddTags")
   public void testMetricNamePrefix() {
-    Assert.assertEquals(
-        this.tagged.metricNamePrefix(false), MetricRegistry.name(JOB_ID, 
Integer.toString(PROJECT_VERSION)));
-    Assert.assertEquals(
-        this.tagged.metricNamePrefix(true),
-        MetricRegistry.name(this.tagged.getTags().get(0).toString(), 
this.tagged.getTags().get(1).toString()));
+    Assert.assertEquals(this.tagged.metricNamePrefix(false),
+        MetricRegistry.name(JOB_ID, Integer.toString(PROJECT_VERSION), 
Integer.toString(WORKUNITS)));
+    Assert.assertEquals(this.tagged.metricNamePrefix(true),
+        MetricRegistry.name(this.tagged.getTags().get(0).toString(), 
this.tagged.getTags().get(1).toString(),
+            this.tagged.getTags().get(2).toString()));
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 05603e5..a50a9dd 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.runtime;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -377,6 +378,13 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           jobState.setState(JobState.RunningState.COMMITTED);
           isWorkUnitsEmpty = true;
           return;
+        } else {
+          if (workUnitStream.isSafeToMaterialize()) {
+            this.eventSubmitter.submit(JobEvent.WORK_UNITS_CREATED, 
"workUnitsCreated",
+                
Long.toString(workUnitStream.getMaterializedWorkUnitCollection().size()));
+            LOG.info("Emitting WorkUnitsCreated Event Count: " + 
workUnitStream.getMaterializedWorkUnitCollection().size());
+          }
+
         }
 
         //Initialize writer and converter(s)

Reply via email to