Copilot commented on code in PR #4113: URL: https://github.com/apache/gobblin/pull/4113#discussion_r2097475681
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java: ########## @@ -236,6 +237,9 @@ protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile cop final long blockSize = copyableFile.getBlockSize(this.fs); final long fileSize = copyableFile.getFileStatus().getLen(); + // Store source file size in task state + this.state.setProp(FileSizePolicy.BYTES_READ_KEY, fileSize); Review Comment: Using setProp here overwrites the bytes-read count for each file instead of accumulating across multiple writes. Consider summing the file sizes (e.g., getPropAsLong + new size) to track total bytes read. ```suggestion // Accumulate source file size in task state long currentBytesRead = this.state.getPropAsLong(FileSizePolicy.BYTES_READ_KEY, 0L); this.state.setProp(FileSizePolicy.BYTES_READ_KEY, currentBytesRead + fileSize); ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java: ########## @@ -308,6 +312,8 @@ public boolean apply(FileStatus input) { os.close(); log.info("OutputStream for file {} is closed.", writeAt); inputStream.close(); + long actualFileSize = this.fs.getFileStatus(writeAt).getLen(); + this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, actualFileSize); Review Comment: Similarly, this overwrites the bytes-written count on each write. Consider accumulating the sizes so the final state reflects the sum of all writes. ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java: ########## @@ -352,6 +352,9 @@ public boolean isParentTaskDone() { return parentTaskDone; } + Review Comment: Add a brief JavaDoc to explain the purpose and usage of `getTaskState()`, since it's a new public API. ```suggestion /** * Get the {@link TaskState} associated with this {@link Fork}. * * <p> * The {@link TaskState} contains information about the execution state of the parent {@link Task}, * including metrics and other runtime data. This method allows access to the {@link TaskState} * for monitoring or reporting purposes. * </p> * * @return the {@link TaskState} of the parent {@link Task}. */ ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java: ########## @@ -316,6 +317,23 @@ private void completeShutdown() { this.shutdownLatch.countDown(); } + private void computeAndUpdateTaskDataQuality() { + String overallTaskDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED.name(); + for (Optional<Fork> fork : this.forks.keySet()) { + if (fork.isPresent()) { + TaskState forkTaskState = fork.get().getTaskState(); + if (forkTaskState != null) { + String forkDataQualityResult = forkTaskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + if (forkDataQualityResult != null && TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(forkDataQualityResult)) { + overallTaskDataQuality = TaskLevelPolicyChecker.DataQualityStatus.FAILED.name(); + } + } + } + } + LOG.info("Data quality state of the task is " + overallTaskDataQuality); Review Comment: Use parameterized logging instead of string concatenation: `LOG.info("Data quality state of the task is {}", overallTaskDataQuality);`. ```suggestion LOG.info("Data quality state of the task is {}", overallTaskDataQuality); ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java: ########## @@ -90,7 +91,8 @@ public void onTaskRunCompletion(Task task) { // Check the task state and handle task retry if task failed and // it has not reached the maximum number of retries WorkUnitState.WorkingState state = task.getTaskState().getWorkingState(); - if (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < this.maxTaskRetries) { + String dataQualityResult = task.getTaskState().getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + if (TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult) || state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < this.maxTaskRetries) { Review Comment: [nitpick] The mixed use of `||` and `&&` without parentheses can be unclear. Consider adding parentheses around the retry condition: `...( FAILED ) || (state == FAILED && retry)`. ```suggestion if (TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult) || (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < this.maxTaskRetries)) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org