jack-moseley commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677791888



##########
File path: 
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -681,6 +681,8 @@
   public static final String TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 
"task.state.collector.interval.secs";
   public static final int DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 60;
   public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = 
"task.state.collector.handler.class";
+  public static final String REPORT_JOB_PROGRESS = "job.report.progress";

Review comment:
       I would add a constant `public static final boolean 
DEFAULT_REPORT_JOB_PROGRESS = false;` as well, makes it a bit easier if we want 
to change the default later.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,19 @@ public void apply(JobListener jobListener, JobContext 
jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's 
copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, 
false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = 
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, 
totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + 
ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+                + "progress cannot be reported for infinite work unit streams. 
Turn off property "
+                + ConfigurationKeys.REPORT_JOB_PROGRESS + " and rerun job.");

Review comment:
       IMO this could just print a `log.error` line instead of terminating the 
job, I think most other metrics related failures in gobblin would just log the 
issue instead of throwing an exception.

##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution 
convertFlowStatus(org.apache.gobblin.service.monitor
   private static long 
getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following 
formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed

Review comment:
       Is this formula correct? If a job completed 50% in 10 minutes, the 
formula would return `(100/50)*(10 minutes) = 20 minutes`, which is the total 
expected time instead of the expected remaining time right?

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the 
progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = 
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size 
property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy 
property null");
+      return;
+    }
+    this.totalSizeToCopy = 
this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents 
only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > 
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 
100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, 
String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       I think adding these properties is not actually required. You are using 
the `eventSubmitter` passed from `MRJobLauncher`, and that already includes a 
bunch of additional metadata including all three of these properties that are 
added by default.
   
   If you look at other job level events sent by GaaS such as 
`WorkUnitsCreated`, they already contain these despite not explicitly adding 
them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to