This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7e5e0ff [GOBBLIN-766] Emit Workunits Created event
7e5e0ff is described below
commit 7e5e0ff9eadc10206bacb90e4015ab834a7df2b4
Author: krraman <[email protected]>
AuthorDate: Tue May 28 13:48:26 2019 -0700
[GOBBLIN-766] Emit Workunits Created event
Emitting workunits created metric
Adding workunit created event in gobblin metric
docs
Fix typo in metadata
Emitting workunit created metric
Fixing indents
Update javadoc
Trigger build
Closes #2636 from krishraman/master
---
gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md | 1 +
.../java/org/apache/gobblin/metrics/event/JobEvent.java | 1 +
.../java/org/apache/gobblin/metrics/TaggedTest.java | 17 +++++++++++------
.../org/apache/gobblin/runtime/AbstractJobLauncher.java | 8 ++++++++
4 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
index 3ece6c7..4a2bfd3 100644
--- a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
+++ b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
@@ -93,6 +93,7 @@ Job Progression Events
* LockInUse: emitted if a job fails because it fails to get a lock.
* WorkUnitsMissing: emitted if a job exits because source failed to get work
units.
* WorkUnitsEmpty: emitted if a job exits because there were no work units to
process.
+* WorkUnitsCreated: emitted when workunits are created for a task. Metadata:
workUnitsCreated(number of bin-packed workunits created).
* TasksSubmitted: emitted when tasks are submitted for execution. Metadata:
tasksCount(number of tasks submitted).
* TaskFailed: emitted when a task fails. Metadata: taskId(id of the failed
task).
* Job_Successful: emitted at the end of a successful job.
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
index d9310fb..55bf233 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
@@ -31,6 +31,7 @@ public class JobEvent {
public static final String LOCK_IN_USE = "LockInUse";
public static final String WORK_UNITS_MISSING = "WorkUnitsMissing";
public static final String WORK_UNITS_EMPTY = "WorkUnitsEmpty";
+ public static final String WORK_UNITS_CREATED = "WorkUnitsCreated";
public static final String TASKS_SUBMITTED = "TasksSubmitted";
public static final String METADATA_JOB_ID = "jobId";
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
index 10c986e..eeb77a6 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/TaggedTest.java
@@ -38,6 +38,8 @@ public class TaggedTest {
private static final String JOB_ID = "TestJob-0";
private static final String PROJECT_VERSION_KEY = "project.version";
private static final int PROJECT_VERSION = 1;
+ private static final String WORK_UNITS_CREATED = "WorkUnitsCreated";
+ private static final int WORKUNITS = 5;
private Tagged tagged;
@@ -50,24 +52,27 @@ public class TaggedTest {
public void testAddTags() {
this.tagged.addTag(new Tag<String>(JOB_ID_KEY, JOB_ID));
this.tagged.addTag(new Tag<Integer>(PROJECT_VERSION_KEY, PROJECT_VERSION));
+ this.tagged.addTag(new Tag<Integer>(WORK_UNITS_CREATED, WORKUNITS));
}
@Test(dependsOnMethods = "testAddTags")
public void testGetTags() {
List<Tag<?>> tags = this.tagged.getTags();
- Assert.assertEquals(tags.size(), 2);
+ Assert.assertEquals(tags.size(), 3);
Assert.assertEquals(tags.get(0).getKey(), JOB_ID_KEY);
Assert.assertEquals(tags.get(0).getValue(), JOB_ID);
Assert.assertEquals(tags.get(1).getKey(), PROJECT_VERSION_KEY);
Assert.assertEquals(tags.get(1).getValue(), PROJECT_VERSION);
+ Assert.assertEquals(tags.get(2).getKey(), WORK_UNITS_CREATED);
+ Assert.assertEquals(tags.get(2).getValue(), WORKUNITS);
}
@Test(dependsOnMethods = "testAddTags")
public void testMetricNamePrefix() {
- Assert.assertEquals(
- this.tagged.metricNamePrefix(false), MetricRegistry.name(JOB_ID,
Integer.toString(PROJECT_VERSION)));
- Assert.assertEquals(
- this.tagged.metricNamePrefix(true),
- MetricRegistry.name(this.tagged.getTags().get(0).toString(),
this.tagged.getTags().get(1).toString()));
+ Assert.assertEquals(this.tagged.metricNamePrefix(false),
+ MetricRegistry.name(JOB_ID, Integer.toString(PROJECT_VERSION),
Integer.toString(WORKUNITS)));
+ Assert.assertEquals(this.tagged.metricNamePrefix(true),
+ MetricRegistry.name(this.tagged.getTags().get(0).toString(),
this.tagged.getTags().get(1).toString(),
+ this.tagged.getTags().get(2).toString()));
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 05603e5..a50a9dd 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.runtime;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -377,6 +378,13 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
jobState.setState(JobState.RunningState.COMMITTED);
isWorkUnitsEmpty = true;
return;
+ } else {
+ if (workUnitStream.isSafeToMaterialize()) {
+ this.eventSubmitter.submit(JobEvent.WORK_UNITS_CREATED,
"workUnitsCreated",
+
Long.toString(workUnitStream.getMaterializedWorkUnitCollection().size()));
+ LOG.info("Emitting WorkUnitsCreated Event Count: " +
workUnitStream.getMaterializedWorkUnitCollection().size());
+ }
+
}
//Initialize writer and converter(s)