Copilot commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2097475681


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java:
##########
@@ -236,6 +237,9 @@ protected void writeImpl(InputStream inputStream, Path 
writeAt, CopyableFile cop
     final long blockSize = copyableFile.getBlockSize(this.fs);
     final long fileSize = copyableFile.getFileStatus().getLen();
 
+    // Store source file size in task state
+    this.state.setProp(FileSizePolicy.BYTES_READ_KEY, fileSize);

Review Comment:
   Using setProp here overwrites the bytes-read count for each file instead of 
accumulating across multiple writes. Consider summing the file sizes (e.g., 
getPropAsLong + new size) to track total bytes read.
   ```suggestion
       // Accumulate source file size in task state
       long currentBytesRead = 
this.state.getPropAsLong(FileSizePolicy.BYTES_READ_KEY, 0L);
       this.state.setProp(FileSizePolicy.BYTES_READ_KEY, currentBytesRead + 
fileSize);
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java:
##########
@@ -308,6 +312,8 @@ public boolean apply(FileStatus input) {
         os.close();
         log.info("OutputStream for file {} is closed.", writeAt);
         inputStream.close();
+        long actualFileSize = this.fs.getFileStatus(writeAt).getLen();
+        this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, actualFileSize);

Review Comment:
   Similarly, this overwrites the bytes-written count on each write. Consider 
accumulating the sizes so the final state reflects the sum of all writes.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java:
##########
@@ -352,6 +352,9 @@ public boolean isParentTaskDone() {
     return parentTaskDone;
   }
 
+

Review Comment:
   Add a brief JavaDoc to explain the purpose and usage of `getTaskState()`, 
since it's a new public API.
   ```suggestion
   
     /**
      * Get the {@link TaskState} associated with this {@link Fork}.
      *
      * <p>
      *   The {@link TaskState} contains information about the execution state 
of the parent {@link Task},
      *   including metrics and other runtime data. This method allows access 
to the {@link TaskState}
      *   for monitoring or reporting purposes.
      * </p>
      *
      * @return the {@link TaskState} of the parent {@link Task}.
      */
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java:
##########
@@ -316,6 +317,23 @@ private void completeShutdown() {
     this.shutdownLatch.countDown();
   }
 
+  private void computeAndUpdateTaskDataQuality() {
+    String overallTaskDataQuality = 
TaskLevelPolicyChecker.DataQualityStatus.PASSED.name();
+    for (Optional<Fork> fork : this.forks.keySet()) {
+      if (fork.isPresent()) {
+        TaskState forkTaskState = fork.get().getTaskState();
+        if (forkTaskState != null) {
+          String forkDataQualityResult = 
forkTaskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+          if (forkDataQualityResult != null && 
TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(forkDataQualityResult))
 {
+            overallTaskDataQuality = 
TaskLevelPolicyChecker.DataQualityStatus.FAILED.name();
+          }
+        }
+      }
+    }
+    LOG.info("Data quality state of the task is " + overallTaskDataQuality);

Review Comment:
   Use parameterized logging instead of string concatenation: `LOG.info("Data 
quality state of the task is {}", overallTaskDataQuality);`.
   ```suggestion
       LOG.info("Data quality state of the task is {}", overallTaskDataQuality);
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java:
##########
@@ -90,7 +91,8 @@ public void onTaskRunCompletion(Task task) {
       // Check the task state and handle task retry if task failed and
       // it has not reached the maximum number of retries
       WorkUnitState.WorkingState state = task.getTaskState().getWorkingState();
-      if (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < 
this.maxTaskRetries) {
+      String dataQualityResult = 
task.getTaskState().getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+      if 
(TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult)
 || state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < 
this.maxTaskRetries) {

Review Comment:
   [nitpick] The mixed use of `||` and `&&` without parentheses can be unclear. 
Consider adding parentheses around the retry condition: `...( FAILED ) || 
(state == FAILED && retry)`. 
   ```suggestion
         if 
(TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult)
 || (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < 
this.maxTaskRetries)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to