[
https://issues.apache.org/jira/browse/GOBBLIN-1493?focusedWorklogId=626948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-626948
]
ASF GitHub Bot logged work on GOBBLIN-1493:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 23/Jul/21 00:20
Start Date: 23/Jul/21 00:20
Worklog Time Spent: 10m
Work Description: aplex commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r675209138
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
* number of records processed in the last job execution
*/
processedCount: long
+
+ /**
+ * data copy progress as a percentage
+ */
+ copyProgress: long
+
+ /**
+ * estimate of time left until copy completion
+ */
+ estimatedTimeLeft: long
Review comment:
What is the unit of time here? Could be worth adding the unit into the
name. e.g. estimatedSecondsToCompletion, estimatedTimeLeftSeconds or something
similar.
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +204,17 @@ public static FlowExecution
convertFlowStatus(org.apache.gobblin.service.monitor
private static long
getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
return flowStatus.getFlowExecutionId();
}
+
+ /**
+ * Estimate the time left to complete the copy based on the following
formula -
+ * timeLeft = (100/completionPercentage) * timeElapsed
+ */
+ public static long estimateCopyTimeLeft(Long currentTime, Long startTime,
Long completionPercentage) {
Review comment:
Inside the code, I suggest to use Duration/Interval classes to represent
time periods. That will remove the questions of time units. Restli does not
have a built-in type for time, so we had to covert it to long or string.
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
* number of records processed in the last job execution
*/
processedCount: long
+
+ /**
+ * data copy progress as a percentage
+ */
+ copyProgress: long
Review comment:
I think this is not just for copy. Ingestion job too can have a
progress. For example, we were chatting about having mysql ingestion through
GaaS.
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FlowExecutionResourceLocalHandlerTest {
+
+ @Test
+ public void testEstimateCopyTimeLeftSimple() throws Exception {
+ long currentTime = 50;
+ long startTime = 20;
+ long copyPercentage = 10;
+
+ long timeLeft =
FlowExecutionResourceLocalHandler.estimateCopyTimeLeft(currentTime, startTime,
copyPercentage);
+ Assert.assertEquals(timeLeft,300);
Review comment:
There is a missing space after coma. For new files, you can basically
run "silent code cleanup" command in IntelliJ. It will correct all code style
issues in the file. You can bind it to a shortcut for more convenience. I
usually don't pay attention to setting those spaces and new lines, and just
reformat the file all the time.
For existing file, you can select a code block that you've changed and
reformat it automatically in the same way. As you'll find out, some old files
have existing code style issues, and if the whole file is reformatted, a lot of
unrelated lines will get included in PR.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext
jobContext)
}
}
+ public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {
Review comment:
Looks like this method is public only to allow testing of it. You can
put @VisibleForTesting annotation to signify that. Otherwise folks will suggest
you to make it private.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +468,17 @@ public void apply(JobListener jobListener, JobContext
jobContext)
return;
}
+ // calculation of total bytes to copy in a job used to track a job's
copy progress
+ if
(jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_COPY_PROGRESS, false)) {
+ if (workUnitStream.isSafeToMaterialize()) {
+ Collection<WorkUnit> workUnits =
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+ long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+ this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY,
totalSizeInBytes);
+ } else {
+ throw new
RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not
support work unit streams");
Review comment:
You can add more context here, so when people see it in logs they would
know what to do. For example, you can say that a specific property is turned
on, but progress cannot be collected because ... Then you can suggest people to
either turn the property X off, or use a different Z to get this thing working.
Detailed error messages will reduce the support and oncall volume, and
create overall better experience for users and SREs. I wrote more about this in
the doc -
https://docs.google.com/document/d/154girrJkI_hFNAKiHEyT3T4RtHclxqO15uzaXga74Uw/edit#heading=h.cfg1x3y98n7i
##########
File path:
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
* number of records processed in the last job execution
*/
processedCount: long
+
+ /**
+ * data copy progress as a percentage
+ */
+ copyProgress: long
Review comment:
I assume this will go from 0 to 100, so we can use int here. You can
also add the possible range to the comment above for clarity.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
LOGGER.info(String.format("Collected task state of %d completed tasks",
taskStateQueue.size()));
+ Long taskByteSize;
+ String stringSize;
+ Double newPercentageCopied;
// Add the TaskStates of completed tasks to the JobState so when the
control
// returns to the launcher, it sees the TaskStates of all completed tasks.
for (TaskState taskState : taskStateQueue) {
consumeTaskIssues(taskState);
taskState.setJobState(this.jobState);
this.jobState.addTaskState(taskState);
+
+ if
(jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_COPY_PROGRESS, false)) {
+ stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+ if (stringSize == null) {
+ LOGGER.warn("Expected to report data copy progress but work unit
byte size property null");
+ break;
+ }
+
+ taskByteSize = Long.parseLong(stringSize);
+ // if progress reporting is enabled, value should be present
+ String strTotalSizeToCopy =
this.jobState.getProp(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+ if (strTotalSizeToCopy == null) {
+ LOGGER.warn("Expected to report data copy progress but total bytes
to copy property null");
+ break;
+ }
+
+ this.totalSizeToCopy = Long.parseLong(strTotalSizeToCopy);
+ // avoid flooding Kafka message queue by sending GobblinTrackingEvents
only when threshold is passed
+ this.bytesCopiedSoFar += taskByteSize;
+ newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+ if (newPercentageCopied - this.lastPercentageReported >
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+ this.lastPercentageReported = newPercentageCopied;
+ Long percentageToReport = (long) (this.lastPercentageReported * 100);
Review comment:
Should we do a Math.Round() here? I think by default the value will be
truncated when you convert double to long. so 99.999 will be 99 , not 100. And
we can end up with jobs that are fully complete, but still show 99% progress.
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
LOGGER.info(String.format("Collected task state of %d completed tasks",
taskStateQueue.size()));
+ Long taskByteSize;
+ String stringSize;
+ Double newPercentageCopied;
// Add the TaskStates of completed tasks to the JobState so when the
control
// returns to the launcher, it sees the TaskStates of all completed tasks.
for (TaskState taskState : taskStateQueue) {
consumeTaskIssues(taskState);
taskState.setJobState(this.jobState);
this.jobState.addTaskState(taskState);
+
Review comment:
This method is already quite complicated. Generally, we should try to
have no more a screens of code in each method, to make it easier to understand
and read. You can try moving the logic to a separate method or class, whatever
makes more sense.
Fields like "lastPercentageReported" can be more meaningful in separate
class, rather than in task state collector. That will better match single
responsibility principle - https://en.wikipedia.org/wiki/SOLID
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
##########
@@ -163,6 +163,9 @@ public GobblinTrackingEvent
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.CANCELLED.name());
properties.put(TimingEvent.JOB_END_TIME,
properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
+ case TimingEvent.JOB_COMPLETION_PERCENTAGE:
+ properties.put(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME,
properties.getProperty(TimingEvent.METADATA_END_TIME));
Review comment:
Is METADATA_END_TIME the correct value here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 626948)
Time Spent: 1h (was: 50m)
> Data Copy Progress Reporting
> -----------------------------
>
> Key: GOBBLIN-1493
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1493
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core, gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Progress reporting for a data copy will provide users with quantitative
> feedback on the progress of a data copy job as a percentage as well as an
> estimate of the time remaining for completion. This will update the existing
> job status endpoint to include the progress percentage and estimate of time
> left.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)