This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c358719 [GOBBLIN-766] Emit WorkUnitsCreated Count Event for MR
deployed jobs.
c358719 is described below
commit c35871920cc5f8378a4f9962658b840994f071f5
Author: krraman <[email protected]>
AuthorDate: Wed Sep 11 16:30:36 2019 -0700
[GOBBLIN-766] Emit WorkUnitsCreated Count Event for MR deployed jobs.
Closes #2706 from krishraman/master
---
.../gobblin/cluster/GobblinHelixJobLauncher.java | 6 ++
.../resources/GobblinHelixJobLauncherTest.conf | 1 +
gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md | 1 +
.../org/apache/gobblin/metrics/event/JobEvent.java | 1 +
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 75 +++++++++-------------
.../gobblin/runtime/local/LocalJobLauncher.java | 10 ++-
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 7 ++
7 files changed, 55 insertions(+), 46 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index d37dab1..f3c63f9 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -199,6 +201,10 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
@Override
protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
try {
+ CountEventBuilder countEventBuilder = new
CountEventBuilder(JobEvent.WORK_UNITS_CREATED, workUnits.size());
+ this.eventSubmitter.submit(countEventBuilder);
+ LOGGER.info("Emitting WorkUnitsCreated Count: " +
countEventBuilder.getCount());
+
long workUnitStartTime = System.currentTimeMillis();
workUnits.forEach((k) ->
k.setProp(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS,
workUnitStartTime));
diff --git
a/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf
b/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf
index 6251c3d..69c67cd 100644
--- a/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf
+++ b/gobblin-cluster/src/test/resources/GobblinHelixJobLauncherTest.conf
@@ -31,3 +31,4 @@ writer.file.name="foo.avro"
writer.file.path=avro
writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
source.schema="{\"namespace\":\"example.avro\", \"type\":\"record\",
\"name\":\"User\", \"fields\":[{\"name\":\"name\", \"type\":\"string\"},
{\"name\":\"favorite_number\", \"type\":\"int\"},
{\"name\":\"favorite_color\", \"type\":\"string\"}]}"
+metrics.enabled=true
\ No newline at end of file
diff --git a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
index 3ece6c7..5388c7b 100644
--- a/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
+++ b/gobblin-docs/metrics/Metrics-for-Gobblin-ETL.md
@@ -93,6 +93,7 @@ Job Progression Events
* LockInUse: emitted if a job fails because it fails to get a lock.
* WorkUnitsMissing: emitted if a job exits because source failed to get work
units.
* WorkUnitsEmpty: emitted if a job exits because there were no work units to
process.
+* WorkUnitsCreated: emitted when workunits are created for a task. Metadata:
workUnitsCreated(Number of bin-packed workunits created).
* TasksSubmitted: emitted when tasks are submitted for execution. Metadata:
tasksCount(number of tasks submitted).
* TaskFailed: emitted when a task fails. Metadata: taskId(id of the failed
task).
* Job_Successful: emitted at the end of a successful job.
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
index 3f35b73..f8c5e49 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
@@ -31,6 +31,7 @@ public class JobEvent {
public static final String LOCK_IN_USE = "LockInUse";
public static final String WORK_UNITS_MISSING = "WorkUnitsMissing";
public static final String WORK_UNITS_EMPTY = "WorkUnitsEmpty";
+ public static final String WORK_UNITS_CREATED = "WorkUnitsCreated";
public static final String TASKS_SUBMITTED = "TasksSubmitted";
public static final String METADATA_JOB_ID = "jobId";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 3fdf2d8..fc98beb 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -106,14 +106,9 @@ public class GobblinMultiTaskAttempt {
*/
private List<CommitStep> cleanupCommitSteps;
- public GobblinMultiTaskAttempt(Iterator<WorkUnit> workUnits,
- String jobId,
- JobState jobState,
- TaskStateTracker taskStateTracker,
- TaskExecutor taskExecutor,
- Optional<String> containerIdOptional,
- Optional<StateStore<TaskState>>
taskStateStoreOptional,
- SharedResourcesBroker<GobblinScopeTypes>
jobBroker) {
+ public GobblinMultiTaskAttempt(Iterator<WorkUnit> workUnits, String jobId,
JobState jobState,
+ TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
Optional<String> containerIdOptional,
+ Optional<StateStore<TaskState>> taskStateStoreOptional,
SharedResourcesBroker<GobblinScopeTypes> jobBroker) {
super();
this.workUnits = workUnits;
this.jobId = jobId;
@@ -123,8 +118,8 @@ public class GobblinMultiTaskAttempt {
this.taskExecutor = taskExecutor;
this.containerIdOptional = containerIdOptional;
this.taskStateStoreOptional = taskStateStoreOptional;
- this.log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName()
+ "-" +
- containerIdOptional.or("noattempt"));
+ this.log =
+ LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName() + "-"
+ containerIdOptional.or("noattempt"));
this.jobBroker = jobBroker;
this.tasks = new ArrayList<>();
}
@@ -134,8 +129,7 @@ public class GobblinMultiTaskAttempt {
* @throws IOException
* @throws InterruptedException
*/
- public void run()
- throws IOException, InterruptedException {
+ public void run() throws IOException, InterruptedException {
if (!this.workUnits.hasNext()) {
log.warn("No work units to run in container " +
containerIdOptional.or(""));
return;
@@ -143,8 +137,7 @@ public class GobblinMultiTaskAttempt {
CountUpAndDownLatch countDownLatch = new CountUpAndDownLatch(0);
this.tasks = runWorkUnits(countDownLatch);
- log.info("Waiting for submitted tasks of job {} to complete in container
{}...", jobId,
- containerIdOptional.or(""));
+ log.info("Waiting for submitted tasks of job {} to complete in container
{}...", jobId, containerIdOptional.or(""));
try {
while (countDownLatch.getCount() > 0) {
if (this.interruptionPredicate.test(this)) {
@@ -183,8 +176,7 @@ public class GobblinMultiTaskAttempt {
* 3. persist task statestore.
* @throws IOException
*/
- public void commit()
- throws IOException {
+ public void commit() throws IOException {
if (this.tasks == null || this.tasks.isEmpty()) {
log.warn("No tasks to be committed in container " +
containerIdOptional.or(""));
return;
@@ -196,8 +188,7 @@ public class GobblinMultiTaskAttempt {
return new Callable<Void>() {
@Nullable
@Override
- public Void call()
- throws Exception {
+ public Void call() throws Exception {
task.commit();
return null;
}
@@ -208,8 +199,8 @@ public class GobblinMultiTaskAttempt {
try {
List<Either<Void, ExecutionException>> executionResults =
new IteratorExecutor<>(callableIterator,
this.getTaskCommitThreadPoolSize(),
- ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
Optional.of("Task-committing-pool-%d")))
- .executeAndGetResults();
+ ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
+
Optional.of("Task-committing-pool-%d"))).executeAndGetResults();
IteratorExecutor.logFailures(executionResults, log, 10);
} catch (InterruptedException ie) {
log.error("Committing of tasks interrupted. Aborting.");
@@ -229,10 +220,9 @@ public class GobblinMultiTaskAttempt {
* A method that shuts down all running tasks managed by this instance.
* TODO: Call this from the right place.
*/
- public void shutdownTasks()
- throws InterruptedException {
+ public void shutdownTasks() throws InterruptedException {
log.info("Shutting down tasks");
- for (Task task: this.tasks) {
+ for (Task task : this.tasks) {
task.shutdown();
}
@@ -249,8 +239,7 @@ public class GobblinMultiTaskAttempt {
}
}
- private void persistTaskStateStore()
- throws IOException {
+ private void persistTaskStateStore() throws IOException {
if (!this.taskStateStoreOptional.isPresent()) {
log.info("Task state store does not exist.");
return;
@@ -286,8 +275,8 @@ public class GobblinMultiTaskAttempt {
// If there are task failures then the tasks may be reattempted. Save
a copy of the task state that is used
// to filter out successful tasks on subsequent attempts.
- if (task.getTaskState().getWorkingState() ==
WorkUnitState.WorkingState.SUCCESSFUL ||
- task.getTaskState().getWorkingState() ==
WorkUnitState.WorkingState.COMMITTED) {
+ if (task.getTaskState().getWorkingState() ==
WorkUnitState.WorkingState.SUCCESSFUL
+ || task.getTaskState().getWorkingState() ==
WorkUnitState.WorkingState.COMMITTED) {
taskStateStore.put(task.getJobId(), task.getTaskId() +
TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX,
task.getTaskState());
}
@@ -417,8 +406,8 @@ public class GobblinMultiTaskAttempt {
}
}
- new EventSubmitter.Builder(JobMetrics.get(this.jobId, new
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(), "gobblin.runtime")
- .build()
+ new EventSubmitter.Builder(JobMetrics.get(this.jobId, new
JobMetrics.CreatorTag(this.attemptId)).getMetricContext(),
+ "gobblin.runtime").build()
.submit(JobEvent.TASKS_SUBMITTED, "tasksCount",
Long.toString(countDownLatch.getRegisteredParties()));
return tasks;
@@ -432,21 +421,20 @@ public class GobblinMultiTaskAttempt {
this.log.info("Heap Memory");
this.log.info(String.format(format, "init", "used", "Committed", "max"));
- this.log.info(String.format(format, heapMemory.getInit(),
heapMemory.getUsed(),
- heapMemory.getCommitted(), heapMemory.getMax()));
+ this.log.info(String.format(format, heapMemory.getInit(),
heapMemory.getUsed(), heapMemory.getCommitted(),
+ heapMemory.getMax()));
this.log.info("Non-heap Memory");
this.log.info(String.format(format, "init", "used", "Committed", "max"));
- this.log.info(String.format(format, nonHeapMemory.getInit(),
nonHeapMemory.getUsed(),
- nonHeapMemory.getCommitted(), nonHeapMemory.getMax()));
+ this.log.info(String.format(format, nonHeapMemory.getInit(),
nonHeapMemory.getUsed(), nonHeapMemory.getCommitted(),
+ nonHeapMemory.getMax()));
}
-
private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch
countDownLatch) {
Optional<TaskFactory> taskFactoryOpt =
TaskUtils.getTaskFactory(workUnitState);
if (taskFactoryOpt.isPresent()) {
- return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(new
TaskContext(workUnitState)), new TaskContext(workUnitState),
- countDownLatch, this.taskStateTracker);
+ return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(new
TaskContext(workUnitState)),
+ new TaskContext(workUnitState), countDownLatch,
this.taskStateTracker);
} else {
return new Task(new TaskContext(workUnitState), this.taskStateTracker,
this.taskExecutor,
Optional.of(countDownLatch));
@@ -477,7 +465,7 @@ public class GobblinMultiTaskAttempt {
* {@link JobMetrics#attemptRemove(String, Tag)}.
*/
public void cleanMetrics() {
- tasks.forEach(task-> {
+ tasks.forEach(task -> {
TaskMetrics.remove(task);
JobMetrics.attemptRemove(this.jobId, new
JobMetrics.CreatorTag(task.getTaskId()));
});
@@ -491,12 +479,12 @@ public class GobblinMultiTaskAttempt {
* updating the task state.
*/
public static GobblinMultiTaskAttempt runWorkUnits(JobContext jobContext,
Iterator<WorkUnit> workUnits,
- TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
- CommitPolicy multiTaskAttemptCommitPolicy)
+ TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
CommitPolicy multiTaskAttemptCommitPolicy)
throws IOException, InterruptedException {
GobblinMultiTaskAttempt multiTaskAttempt =
- new GobblinMultiTaskAttempt(workUnits, jobContext.getJobId(),
jobContext.getJobState(), taskStateTracker, taskExecutor,
- Optional.<String>absent(),
Optional.<StateStore<TaskState>>absent(), jobContext.getJobBroker());
+ new GobblinMultiTaskAttempt(workUnits, jobContext.getJobId(),
jobContext.getJobState(), taskStateTracker,
+ taskExecutor, Optional.<String>absent(),
Optional.<StateStore<TaskState>>absent(),
+ jobContext.getJobBroker());
multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
return multiTaskAttempt;
}
@@ -519,9 +507,8 @@ public class GobblinMultiTaskAttempt {
*/
public static GobblinMultiTaskAttempt runWorkUnits(String jobId, String
containerId, JobState jobState,
List<WorkUnit> workUnits, TaskStateTracker taskStateTracker,
TaskExecutor taskExecutor,
- StateStore<TaskState> taskStateStore,
- CommitPolicy multiTaskAttemptCommitPolicy,
SharedResourcesBroker<GobblinScopeTypes> jobBroker,
- Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
+ StateStore<TaskState> taskStateStore, CommitPolicy
multiTaskAttemptCommitPolicy,
+ SharedResourcesBroker<GobblinScopeTypes> jobBroker,
Predicate<GobblinMultiTaskAttempt> interruptionPredicate)
throws IOException, InterruptedException {
// dump the work unit if tracking logs are enabled
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
index 6b6bc16..bcebb73 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalJobLauncher.java
@@ -24,6 +24,8 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.runtime.job.JobInterruptionPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +72,8 @@ public class LocalJobLauncher extends AbstractJobLauncher {
// Service manager to manage dependent services
private final ServiceManager serviceManager;
+ private Integer workUnitCount;
+
public LocalJobLauncher(Properties jobProps) throws Exception {
this(jobProps, null, ImmutableList.of());
}
@@ -124,18 +128,21 @@ public class LocalJobLauncher extends AbstractJobLauncher
{
@Override
protected void runWorkUnitStream(WorkUnitStream workUnitStream) throws
Exception {
+
String jobId = this.jobContext.getJobId();
final JobState jobState = this.jobContext.getJobState();
Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
if (!workUnitIterator.hasNext()) {
LOG.warn("No work units to run");
+ CountEventBuilder countEventBuilder = new
CountEventBuilder(JobEvent.WORK_UNITS_EMPTY, 0);
+ this.eventSubmitter.submit(countEventBuilder);
return;
}
-
TimingEvent workUnitsRunTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.WORK_UNITS_RUN);
Iterator<WorkUnit> flattenedWorkUnits = new
MultiWorkUnitUnpackingIterator(workUnitStream.getWorkUnits());
+
Iterator<WorkUnit> workUnitsWithJobState =
Iterators.transform(flattenedWorkUnits, new Function<WorkUnit, WorkUnit>() {
@Override
public WorkUnit apply(WorkUnit workUnit) {
@@ -143,7 +150,6 @@ public class LocalJobLauncher extends AbstractJobLauncher {
return workUnit;
}
});
-
Thread thisThread = Thread.currentThread();
JobInterruptionPredicate jobInterruptionPredicate =
new JobInterruptionPredicate(jobState, () -> thisThread.interrupt(),
true);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 0a2a284..0077d0f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -29,6 +29,9 @@ import java.util.concurrent.TimeoutException;
import org.apache.gobblin.fsm.FiniteStateMachine;
import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.JobStateEventBuilder;
import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
import org.apache.hadoop.conf.Configuration;
@@ -262,6 +265,10 @@ public class MRJobLauncher extends AbstractJobLauncher {
JobState jobState = this.jobContext.getJobState();
try {
+ CountEventBuilder countEventBuilder = new
CountEventBuilder(JobEvent.WORK_UNITS_CREATED, workUnits.size());
+ this.eventSubmitter.submit(countEventBuilder);
+ LOG.info("Emitting WorkUnitsCreated Count: " +
countEventBuilder.getCount());
+
prepareHadoopJob(workUnits);
// Start the output TaskState collector service