jack-moseley commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677791888
##########
File path:
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -681,6 +681,8 @@
public static final String TASK_STATE_COLLECTOR_INTERVAL_SECONDS =
"task.state.collector.interval.secs";
public static final int DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 60;
public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS =
"task.state.collector.handler.class";
+ public static final String REPORT_JOB_PROGRESS = "job.report.progress";
Review comment:
I would add a constant `public static final boolean
DEFAULT_REPORT_JOB_PROGRESS = false;` as well, makes it a bit easier if we want
to change the default later.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,19 @@ public void apply(JobListener jobListener, JobContext
jobContext)
return;
}
+ // calculation of total bytes to copy in a job used to track a job's
copy progress
+ if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS,
false)) {
+ if (workUnitStream.isSafeToMaterialize()) {
+ Collection<WorkUnit> workUnits =
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+ long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+ this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY,
totalSizeInBytes);
+ } else {
+ throw new RuntimeException("Property " +
ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+ + "progress cannot be reported for infinite work unit streams.
Turn off property "
+ + ConfigurationKeys.REPORT_JOB_PROGRESS + " and rerun job.");
Review comment:
IMO this could just print a `log.error` line instead of terminating the
job, I think most other metrics related failures in gobblin would just log the
issue instead of throwing an exception.
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution
convertFlowStatus(org.apache.gobblin.service.monitor
private static long
getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
return flowStatus.getFlowExecutionId();
}
+
+ /**
+ * Estimate the time left to complete the copy based on the following
formula -
+ * timeLeft = (100/completionPercentage) * timeElapsed
Review comment:
Is this formula correct? If a job completed 50% in 10 minutes, the
formula would return `(100/50)*(10 minutes) = 20 minutes`, which is the total
expected time instead of the expected remaining time right?
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Uses the size of work units to determine a job's progress and reports the
progress as a percentage via
+ * GobblinTrackingEvents
+ * @param taskState of job launched
+ */
+ private void reportJobProgress(TaskState taskState) {
+ String stringSize =
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+ if (stringSize == null) {
+ LOGGER.warn("Expected to report job progress but work unit byte size
property null");
+ return;
+ }
+
+ Long taskByteSize = Long.parseLong(stringSize);
+
+ // if progress reporting is enabled, value should be present
+ if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+ LOGGER.warn("Expected to report job progress but total bytes to copy
property null");
+ return;
+ }
+ this.totalSizeToCopy =
this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+ // avoid flooding Kafka message queue by sending GobblinTrackingEvents
only when threshold is passed
+ this.bytesCopiedSoFar += taskByteSize;
+ Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+ if (newPercentageCopied - this.lastPercentageReported >
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+ this.lastPercentageReported = newPercentageCopied;
+ int percentageToReport = (int) Math.round(this.lastPercentageReported *
100);
+
+ Map<String, String> progress = new HashMap<>();
+ progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE,
String.valueOf(percentageToReport));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
Review comment:
I think adding these properties is not actually required. You are using
the `eventSubmitter` passed from `MRJobLauncher`, and that already includes a
bunch of additional metadata including all three of these properties that are
added by default.
If you look at other job level events sent by GaaS such as
`WorkUnitsCreated`, they already contain these despite not explicitly adding
them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]