This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f7fe367c40 DQ handling for no bytes read scenarios and metrics for 
bytes read/written (#4135)
f7fe367c40 is described below

commit f7fe367c4089ae7b0895c442d5e6a18b872b5761
Author: vsinghal85 <[email protected]>
AuthorDate: Mon Aug 25 12:08:39 2025 +0530

    DQ handling for no bytes read scenarios and metrics for bytes read/written 
(#4135)
    
    * introduce not evaluated flag for null bytes read case and bytes 
read/written metrics
    
    ---------
    
    Co-authored-by: Vaibhav Singhal <[email protected]>
---
 .../qualitychecker/task/TaskLevelPolicy.java       |  3 ++-
 .../gobblin/policies/size/FileSizePolicy.java      | 26 +++++++++++++-------
 .../gobblin/policies/size/FileSizePolicyTest.java  | 17 ++++++++++---
 .../apache/gobblin/metrics/ServiceMetricNames.java |  2 ++
 .../gobblin/quality/DataQualityEvaluator.java      | 28 ++++++++++++++++++----
 .../main/java/org/apache/gobblin/runtime/Task.java | 26 ++++++++++++++++++--
 .../java/org/apache/gobblin/runtime/fork/Fork.java | 24 +++++++++++++++----
 7 files changed, 102 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
index b5043e01e8..de53f26485 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
@@ -32,7 +32,8 @@ public abstract class TaskLevelPolicy {
 
   public enum Result {
     PASSED, // The test passed
-    FAILED // The test failed
+    FAILED, // The test failed
+    NOT_EVALUATED // The test was not evaluated
   }
 
   public TaskLevelPolicy(State state, TaskLevelPolicy.Type type) {
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
index d5f935d3bc..550f2299a3 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
@@ -44,12 +44,12 @@ public class FileSizePolicy extends TaskLevelPolicy {
   public Result executePolicy() {
     TransferBytes transferBytes = 
getBytesReadAndWritten(this.state).orElse(null);
     if (transferBytes == null) {
-      return Result.FAILED;
+      return Result.NOT_EVALUATED;
     }
-    Long bytesRead = transferBytes.getBytesRead();
-    Long bytesWritten = transferBytes.getBytesWritten();
+    long bytesRead = transferBytes.getBytesRead();
+    long bytesWritten = transferBytes.getBytesWritten();
 
-    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+    long sizeDifference = Math.abs(bytesRead - bytesWritten);
 
     if (sizeDifference == 0) {
       return Result.PASSED;
@@ -67,7 +67,7 @@ public class FileSizePolicy extends TaskLevelPolicy {
       return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
transferBytes.getBytesRead(),
           transferBytes.getBytesWritten());
     } else {
-      return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+      return "Transfer bytes information not available";
     }
   }
 
@@ -87,18 +87,28 @@ public class FileSizePolicy extends TaskLevelPolicy {
 
   /**
    * Extracts bytesRead and bytesWritten from the given state.
+   * If bytesRead is null/zero, skip data quality check by returning 
Optional.empty().
    * Returns Empty Optional if parsing fails.
    */
   private Optional<TransferBytes> getBytesReadAndWritten(State state) {
     String bytesReadString = state.getProp(BYTES_READ_KEY);
     String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
-    if (bytesReadString == null || bytesWrittenString == null) {
-      log.error("Missing value(s): bytesReadStr={}, bytesWrittenStr={}", 
bytesReadString, bytesWrittenString);
+    if (bytesReadString == null) {
+      log.error("Missing value(s): bytesReadStr=null, bytesWrittenStr={}", 
bytesWrittenString);
       return Optional.empty();
     }
     try {
       long bytesRead = Long.parseLong(bytesReadString);
-      long bytesWritten = Long.parseLong(bytesWrittenString);
+      if (bytesRead == 0) {
+        log.warn("Bytes read is zero, skipping file size check.");
+        return Optional.empty();
+      }
+      long bytesWritten = 0;
+      if (bytesWrittenString == null) {
+        log.error("Missing bytesWritten value: bytesWrittenStr=null, assuming 
0 bytes written.");
+      } else {
+        bytesWritten = Long.parseLong(bytesWrittenString);
+      }
       return Optional.of(new TransferBytes(bytesRead, bytesWritten));
     } catch (NumberFormatException e) {
       log.error("Invalid number format for bytesRead or bytesWritten: 
bytesRead='{}', bytesWritten='{}'",
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
index 0129891748..5a4d421215 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
@@ -49,7 +49,7 @@ public class FileSizePolicyTest {
     State state = new State();
     // No properties set at all
     FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
-    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+    Assert.assertEquals(policy.executePolicy(), 
TaskLevelPolicy.Result.NOT_EVALUATED);
   }
 
   @Test
@@ -64,9 +64,20 @@ public class FileSizePolicyTest {
     // Reset state and only set bytes written, not bytes read
     state = new State();
     state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
-
+    // bytes read is null
     policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
-    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+    Assert.assertEquals(policy.executePolicy(), 
TaskLevelPolicy.Result.NOT_EVALUATED);
+  }
+
+  @Test
+  public void testEmptyDirectoryHandling() {
+    State state = new State();
+    // Test case: Empty directory with 0 bytes read, no bytes written 
(directory case)
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 0L);
+    // Don't set BYTES_WRITTEN_KEY to simulate directory copy scenario
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), 
TaskLevelPolicy.Result.NOT_EVALUATED);
   }
 
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 57e95508b6..7809e785f5 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -28,6 +28,8 @@ public class ServiceMetricNames {
   public static final String DATA_QUALITY_SUCCESS_FILE_COUNT = 
"dataQualitySuccessFileCount";
   public static final String DATA_QUALITY_FAILURE_FILE_COUNT = 
"dataQualityFailureFileCount";
   public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT = 
"dataQualityNonEvaluatedFileCount";
+  public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
+  public static final String DATA_QUALITY_BYTES_WRITTEN = 
"dataQualityBytesWritten";
 
   // Flow Compilation Meters and Timer
   public static final String FLOW_COMPILATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
index 959f57053c..23590789a0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.metrics.OpenTelemetryMetrics;
 import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.policies.size.FileSizePolicy;
 import org.apache.gobblin.qualitychecker.DataQualityStatus;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskState;
@@ -66,6 +67,10 @@ public class DataQualityEvaluator {
     private final int failedFiles;
     // Number of files that were not evaluated for data quality for example 
files not found or not processed
     private final int nonEvaluatedFiles;
+    // total bytes read
+    private  final long bytesRead;
+    // total bytes written
+    private final long bytesWritten;
   }
 
   /**
@@ -85,7 +90,7 @@ public class DataQualityEvaluator {
     jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
     // Emit dataset-specific metrics
     emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), 
result.getPassedFiles(),
-        result.getFailedFiles(), result.getNonEvaluatedFiles(), 
datasetState.getDatasetUrn());
+        result.getFailedFiles(), result.getNonEvaluatedFiles(), 
result.getBytesRead(), result.getBytesWritten(), datasetState.getDatasetUrn());
 
     return result;
   }
@@ -103,17 +108,19 @@ public class DataQualityEvaluator {
     int failedFilesCount = 0;
     int passedFilesCount = 0;
     int nonEvaluatedFilesCount = 0;
+    long bytesRead = 0;
+    long bytesWritten = 0;
 
     for (TaskState taskState : taskStates) {
       totalFiles++;
-
       // Handle null task states gracefully
       if (taskState == null) {
         log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
         nonEvaluatedFilesCount++;
         continue;
       }
-
+      bytesRead += taskState.getPropAsLong(FileSizePolicy.BYTES_READ_KEY, 0L);
+      bytesWritten += 
taskState.getPropAsLong(FileSizePolicy.BYTES_WRITTEN_KEY, 0L);
       DataQualityStatus taskDataQuality = null;
       String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
       taskDataQuality = DataQualityStatus.fromString(result);
@@ -138,11 +145,11 @@ public class DataQualityEvaluator {
     log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: 
{}, Not Evaluated: {}", totalFiles,
         passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
     return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, 
passedFilesCount, failedFilesCount,
-        nonEvaluatedFilesCount);
+        nonEvaluatedFilesCount, bytesRead, bytesWritten);
   }
 
   private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
-      final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount,
+      final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final long bytesRead, final long bytesWritten,
       final String datasetUrn) {
     try {
       // Check if OpenTelemetry is enabled
@@ -187,6 +194,17 @@ public class DataQualityEvaluator {
       
meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_NON_EVALUATED_FILE_COUNT)
           .setDescription("Number of files that did not have data quality 
evaluation").build()
           .add(nonEvaluatedFilesCount, tags);
+
+      // Emit bytes read
+      meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_BYTES_READ)
+          .setDescription("Total bytes read").build()
+          .add(bytesRead, tags);
+
+      // Emit bytes written
+      meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_BYTES_WRITTEN)
+          .setDescription("Total bytes written").build()
+          .add(bytesWritten, tags);
+
     } catch (Exception e) {
       log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 2dd2e83f5d..76d8f79fa6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -320,6 +320,8 @@ public class Task implements TaskIFace {
 
   private void computeAndUpdateTaskDataQuality() {
     DataQualityStatus overallTaskDataQuality = DataQualityStatus.PASSED;
+    boolean hasEvaluatedForks = false;
+
     for (Optional<Fork> fork : this.forks.keySet()) {
       if (fork.isPresent()) {
         TaskState forkTaskState = fork.get().getTaskState();
@@ -329,17 +331,37 @@ public class Task implements TaskIFace {
           try {
             if (result != null) {
               forkDataQualityStatus = DataQualityStatus.valueOf(result);
+            } else {
+              forkDataQualityStatus = DataQualityStatus.NOT_EVALUATED;
             }
           } catch (IllegalArgumentException e) {
             Log.warn("Unknown data quality status encountered: " + result);
             forkDataQualityStatus = DataQualityStatus.UNKNOWN;
           }
-          if (DataQualityStatus.FAILED == forkDataQualityStatus) {
-            overallTaskDataQuality = DataQualityStatus.FAILED;
+          /*
+           * If any fork fails, overall status should be FAILED
+           * FAILED status cannot be overridden by subsequent successes
+           * Handle NOT_EVALUATED and UNKNOWN appropriately
+           * If forkDataQualityStatus is PASSED and overall is not 
FAILED/UNKNOWN, keep PASSED
+           */
+          if (forkDataQualityStatus != DataQualityStatus.NOT_EVALUATED) {
+            hasEvaluatedForks = true;
+            if (forkDataQualityStatus == DataQualityStatus.FAILED) {
+              overallTaskDataQuality = DataQualityStatus.FAILED;
+            } else if (forkDataQualityStatus == DataQualityStatus.UNKNOWN &&
+                       overallTaskDataQuality != DataQualityStatus.FAILED) {
+              overallTaskDataQuality = DataQualityStatus.UNKNOWN;
+            }
           }
         }
       }
     }
+
+    // If no forks were evaluated, set overall task status to NOT_EVALUATED
+    if (!hasEvaluatedForks) {
+      overallTaskDataQuality = DataQualityStatus.NOT_EVALUATED;
+    }
+
     LOG.info("Data quality state of the task is {}", overallTaskDataQuality);
     this.taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
overallTaskDataQuality.name());
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 0ff7cde453..81e2d69e2c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -630,11 +630,7 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
       TaskLevelPolicyCheckResults taskResults =
           this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, 
this.branches > 1 ? this.index : -1)
               .executePolicies();
-      boolean hasFailureForMandatoryPolicy = taskResults.getPolicyResults()
-          .getOrDefault(TaskLevelPolicy.Result.FAILED, 
java.util.Collections.emptySet())
-          .contains(TaskLevelPolicy.Type.FAIL);
-      forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
-          hasFailureForMandatoryPolicy ? DataQualityStatus.FAILED.name() : 
DataQualityStatus.PASSED.name());
+      this.computeAndUpdateForkDataQuality(taskResults);
       TaskPublisher publisher = 
this.taskContext.getTaskPublisher(this.forkTaskState, taskResults);
       switch (publisher.canPublish()) {
         case SUCCESS:
@@ -659,6 +655,24 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
     }
   }
 
+  private void computeAndUpdateForkDataQuality(TaskLevelPolicyCheckResults 
taskResults) {
+    boolean hasFailureForMandatoryPolicy =
+        
taskResults.getPolicyResults().getOrDefault(TaskLevelPolicy.Result.FAILED, 
java.util.Collections.emptySet())
+            .contains(TaskLevelPolicy.Type.FAIL);
+    boolean hasNotEvaluatedForMandatoryPolicy = taskResults.getPolicyResults()
+        .getOrDefault(TaskLevelPolicy.Result.NOT_EVALUATED, 
java.util.Collections.emptySet())
+        .contains(TaskLevelPolicy.Type.FAIL);
+    String forkLevelDataQualityResult;
+    if (hasFailureForMandatoryPolicy) {
+      forkLevelDataQualityResult = DataQualityStatus.FAILED.name();
+    } else if (hasNotEvaluatedForMandatoryPolicy) {
+      forkLevelDataQualityResult = DataQualityStatus.NOT_EVALUATED.name();
+    } else {
+      forkLevelDataQualityResult = DataQualityStatus.PASSED.name();
+    }
+    forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
forkLevelDataQualityResult);
+  }
+
   /**
    * Commit task data.
    */

Reply via email to