Repository: incubator-gobblin Updated Branches: refs/heads/master 46c604067 -> 56be9b230
[GOBBLIN-537] Dump workunits to logs for debugging Closes #2400 from htran1/workunit_logging Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56be9b23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56be9b23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56be9b23 Branch: refs/heads/master Commit: 56be9b230ae9324661ae1af514b8063ca961b88d Parents: 46c6040 Author: Hung Tran <[email protected]> Authored: Tue Jul 17 11:24:20 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Jul 17 11:24:20 2018 -0700 ---------------------------------------------------------------------- .../gobblin/configuration/ConfigurationKeys.java | 1 + .../apache/gobblin/source/workunit/MultiWorkUnit.java | 3 +++ .../org/apache/gobblin/source/workunit/WorkUnit.java | 2 +- .../apache/gobblin/runtime/AbstractJobLauncher.java | 13 +++++++++++++ .../gobblin/runtime/GobblinMultiTaskAttempt.java | 7 +++++++ 5 files changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 01fa490..47233e8 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -178,6 +178,7 @@ public class ConfigurationKeys { public static final String WORK_UNIT_RETRY_ENABLED_KEY = "workunit.retry.enabled"; public static final String WORK_UNIT_CREATION_TIME_IN_MILLIS = "workunit.creation.time.in.millis"; public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = "workunit.creation.and.run.interval"; + public static final String WORK_UNIT_ENABLE_TRACKING_LOGS = "workunit.enableTrackingLogs"; public static final String JOB_RUN_ONCE_KEY = "job.runonce"; public static final String JOB_DISABLED_KEY = "job.disabled"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java index d521974..d254de0 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java @@ -26,6 +26,8 @@ import java.util.List; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import lombok.ToString; + /** * A class that wraps multiple {@link WorkUnit}s so they can executed within a single task. @@ -40,6 +42,7 @@ import com.google.common.collect.Lists; * * @author Yinan Li */ +@ToString(callSuper = true) public class MultiWorkUnit extends WorkUnit { private final List<WorkUnit> workUnits = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java index 7d3f5d3..bf38c35 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java @@ -45,7 +45,7 @@ import lombok.ToString; * * @author kgoodhop */ -@ToString +@ToString(callSuper=true) public class WorkUnit extends State { private Extract extract; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index 0c50c32..7f359dc 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -433,6 +433,19 @@ public abstract class AbstractJobLauncher implements JobLauncher { jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState))); } }); + + // dump the work unit if tracking logs are enabled + if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) { + workUnitStream = workUnitStream.transform(new Function<WorkUnit, WorkUnit>() { + @Nullable + @Override + public WorkUnit apply(@Nullable WorkUnit input) { + LOG.info("Work unit tracking log: {}", input); + return input; + } + }); + } + workUnitsPreparationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.WORK_UNITS_PREPARATION)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java index e5643c0..e06f338 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java @@ -421,6 +421,13 @@ public class GobblinMultiTaskAttempt { StateStore<TaskState> taskStateStore, CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker) throws IOException, InterruptedException { + + // dump the work unit if tracking logs are enabled + if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) { + Logger log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName()); + log.info("Work unit tracking log: {}", workUnits); + } + GobblinMultiTaskAttempt multiTaskAttempt = new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, taskStateTracker, taskExecutor, Optional.of(containerId), Optional.of(taskStateStore), jobBroker);
