[
https://issues.apache.org/jira/browse/GOBBLIN-1493?focusedWorklogId=628089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-628089
]
ASF GitHub Bot logged work on GOBBLIN-1493:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jul/21 00:24
Start Date: 27/Jul/21 00:24
Worklog Time Spent: 10m
Work Description: Will-Lo commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677025828
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,20 @@ public void apply(JobListener jobListener, JobContext
jobContext)
return;
}
+ // calculation of total bytes to copy in a job used to track a job's
copy progress
+ if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS,
false)) {
+ if (workUnitStream.isSafeToMaterialize()) {
+ Collection<WorkUnit> workUnits =
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+ long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+ this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY,
totalSizeInBytes);
+ } else {
+ throw new RuntimeException("Property " +
ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+ + "progress cannot be collected because " +
DestinationDatasetHandlerService.class.getName()
+ + " does not support work unit streams. Turn off property " +
ConfigurationKeys.REPORT_JOB_PROGRESS
+ + " and rerun job.");
Review comment:
I think it's not necessarily true that it's due to
`DestinationDatasetHandler` not supporting workunitstreams, but that this
feature also does not support work unit streams (as total size is determined
only once at the beginning).
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -382,9 +384,17 @@ public Void call() {
setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
computeAndSetWorkUnitGuid(workUnit);
addLineageInfo(copyEntity, workUnit);
- if (copyEntity instanceof CopyableFile &&
DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
-
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile)
copyEntity, workUnit, this.targetFs));
+ if (copyEntity instanceof CopyableFile) {
+ CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
+ fileSize = castedCopyEntity.getFileStatus().getLen();
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, fileSize);
+ if (DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
+
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile)
copyEntity, workUnit, this.targetFs));
+ } else {
+ workUnitsForPartition.add(workUnit);
+ }
} else {
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, 0);
Review comment:
Can you leave a comment for when this condition would apply? If I recall
correctly it's for some state files/post publish states
##########
File path:
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -382,9 +384,17 @@ public Void call() {
setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
computeAndSetWorkUnitGuid(workUnit);
addLineageInfo(copyEntity, workUnit);
- if (copyEntity instanceof CopyableFile &&
DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
-
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile)
copyEntity, workUnit, this.targetFs));
+ if (copyEntity instanceof CopyableFile) {
+ CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
+ fileSize = castedCopyEntity.getFileStatus().getLen();
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, fileSize);
+ if (DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
+
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile)
copyEntity, workUnit, this.targetFs));
+ } else {
+ workUnitsForPartition.add(workUnit);
+ }
} else {
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, 0);
Review comment:
Can we leave a comment for when this case is true? From what I remember
it had to do with local state files/ post publishing steps also existing in the
directory.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext
jobContext)
}
}
+ public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {
Review comment:
If you add the annotation the function should be protected/private, in
this case I think it's good to make protected.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,20 @@ public void apply(JobListener jobListener, JobContext
jobContext)
return;
}
+ // calculation of total bytes to copy in a job used to track a job's
copy progress
+ if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS,
false)) {
+ if (workUnitStream.isSafeToMaterialize()) {
+ Collection<WorkUnit> workUnits =
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+ long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+ this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY,
totalSizeInBytes);
+ } else {
+ throw new RuntimeException("Property " +
ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+ + "progress cannot be collected because " +
DestinationDatasetHandlerService.class.getName()
+ + " does not support work unit streams. Turn off property " +
ConfigurationKeys.REPORT_JOB_PROGRESS
+ + " and rerun job.");
Review comment:
We can put a better exception message here to let the user know that
copy progression doesn't work for infinite streams (which make sense, since how
would we determine that the work is completed?)
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Uses the size of work units to determine a job's progress and reports the
progress as a percentage via
+ * GobblinTrackingEvents
+ * @param taskState of job launched
+ */
+ private void reportJobProgress(TaskState taskState) {
+ String stringSize =
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+ if (stringSize == null) {
+ LOGGER.warn("Expected to report job progress but work unit byte size
property null");
+ return;
+ }
+
+ Long taskByteSize = Long.parseLong(stringSize);
+ // if progress reporting is enabled, value should be present
+ String strTotalSizeToCopy =
this.jobState.getProp(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
Review comment:
You can use `PropertiesUtils.getPropAsLong` here (and for task byte
size) instead of pulling the String. Instead of checking for null, check that
the property exists in the state.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -105,6 +105,8 @@ protected JobStatus getJobStatus(State jobState) {
int maxAttempts =
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD,
"1"));
int currentAttempts =
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
"1"));
boolean shouldRetry =
Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
"false"));
+ int progressPercentage =
Integer.parseInt(jobState.getProp(TimingEvent.JOB_COMPLETION_PERCENTAGE, "0"));
+ long lastProgressEventTime =
Long.parseLong(jobState.getProp(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME,
"0"));
Review comment:
Similar comment here, you can use PropertiesUtils to parse ints/longs
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext
jobContext)
}
}
+ public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {
Review comment:
If you mark @VisibleForTesting you should make the function
private/protected (probably protected in this scenario)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 628089)
Time Spent: 2h (was: 1h 50m)
> Data Copy Progress Reporting
> -----------------------------
>
> Key: GOBBLIN-1493
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1493
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core, gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Progress reporting for a data copy will provide users with quantitative
> feedback on the progress of a data copy job as a percentage as well as an
> estimate of the time remaining for completion. This will update the existing
> job status endpoint to include the progress percentage and estimate of time
> left.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)