This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c4d2ba612 [GOBBLIN-2204] Implement FileSize Data Quality for 
FileBasedCopy
4c4d2ba612 is described below

commit 4c4d2ba61261c25de4930d96ba186874bf312082
Author: vsinghal85 <[email protected]>
AuthorDate: Tue Aug 5 20:55:11 2025 +0530

    [GOBBLIN-2204] Implement FileSize Data Quality for FileBasedCopy
    
    * Compute data quality and update task states
    
    * Retry for failed data quality check
    
    * Only do data quality check if data quality flag is enabled
    
    * Computing overall dataquality of data set and adding it to dataset summary
    
    * Changes for testing failed data quality changes
    
    * Updating task state and computing overall data quality
    
    * Refactor and checkstyle fix
    
    * Propogate data quality from fork to task state
    
    * Refactoring and unit tests
    
    * Add unit tests for FileAwareInputStreamDataWriterTest
    
    * refactor changes
    
    * Compute task level data quality
    
    * Some refactoring and code cleanup
    
    * Update tests
    
    * Fix checkstyle failures
    
    * Code cleanup
    
    * Adding metrics for overall data quality
    
    * Address PR comments and add metric for no of files evaluated for data 
quality
    
    * Refactor dq evaluation
    
    * Address PR comments
    
    * Introduce data quality flag, to control and manage job failures due to 
data quality
    
    * Address PR comments
    
    * Switch back to CounterBuilder approach for metrics
    
    * Evaluate data quality only for commit via CommitActivity
    
    * Remove unused enum
    
    * move getDataQuality to JobState
    
    * Address PR comments
    
    * Fix the policy evaluation logic when both optional and mandatory policies 
are provided
    
    * Remove codestyle errors and remove flag check at task level
    
    ---------
    
    Co-authored-by: Vaibhav Singhal <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |   6 +-
 .../qualitychecker/task/TaskLevelPolicy.java       |   2 +-
 .../gobblin/policies/size/FileSizePolicy.java      | 109 +++++++++
 .../apache/gobblin/publisher/TaskPublisher.java    |   5 +-
 .../gobblin/qualitychecker/DataQualityStatus.java  |  49 ++++
 .../task/TaskLevelPolicyCheckResults.java          |   5 +-
 .../task/TaskLevelPolicyChecker.java               |  22 +-
 .../gobblin/policies/size/FileSizePolicyTest.java  |  72 ++++++
 .../qualitychecker/FailingTaskLevelPolicy.java}    |  22 +-
 .../RowCountTaskLevelPolicyTest.java               |  13 +-
 .../TaskLevelQualityCheckerTest.java               |  69 ------
 .../task/TaskLevelQualityCheckerTest.java          | 106 +++++++++
 .../writer/FileAwareInputStreamDataWriter.java     |  22 +-
 ...ncorrectSizeFileAwareInputStreamDataWriter.java |  84 +++++++
 ...tSizeFileAwareInputStreamDataWriterBuilder.java |  60 +++++
 .../writer/FileAwareInputStreamDataWriterTest.java |  42 ++++
 .../apache/gobblin/metrics/event/TimingEvent.java  |   4 +
 .../apache/gobblin/metrics/ServiceMetricNames.java |   6 +
 .../gobblin/quality/DataQualityEvaluator.java      | 218 +++++++++++++++++
 .../gobblin/runtime/AbstractJobLauncher.java       |   4 +-
 .../apache/gobblin/runtime/DatasetTaskSummary.java |  10 +-
 .../gobblin/runtime/ForkThrowableHolder.java       |   6 +-
 .../org/apache/gobblin/runtime/JobContext.java     |   2 +-
 .../java/org/apache/gobblin/runtime/JobState.java  |  55 +++--
 .../apache/gobblin/runtime/SafeDatasetCommit.java  |  22 ++
 .../main/java/org/apache/gobblin/runtime/Task.java |  41 +++-
 .../java/org/apache/gobblin/runtime/fork/Fork.java |  24 +-
 .../runtime/local/LocalTaskStateTracker.java       |   1 +
 .../gobblin/quality/DataQualityEvaluatorTest.java  | 265 +++++++++++++++++++++
 .../org/apache/gobblin/runtime/JobStateTest.java   |   4 +-
 .../GaaSJobObservabilityProducerTest.java          |   4 +-
 .../ddm/activity/impl/CommitActivityImpl.java      |  12 +-
 .../gobblin/temporal/ddm/work/DatasetStats.java    |   1 +
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  |   3 +-
 34 files changed, 1220 insertions(+), 150 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 195ca295ee..d33a99c2e5 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -328,6 +328,9 @@ public class ConfigurationKeys {
   public static final String DATASET_URN_KEY = "dataset.urn";
   public static final String GLOBAL_WATERMARK_DATASET_URN = 
"__globalDatasetWatermark";
   public static final String DEFAULT_DATASET_URN = "";
+  public static final String DATASET_QUALITY_STATUS_KEY = 
"dataset.quality.status";
+  public static final String ENFORCE_DATA_QUALITY_FAILURE_KEY = 
"data.quality.enforce.failure";
+  public static final Boolean DEFAULT_ENFORCE_DATA_QUALITY_FAILURE = false;
 
   /**
    * Work unit related configuration properties.
@@ -498,6 +501,7 @@ public class ConfigurationKeys {
    * Configuration properties used by the quality checker.
    */
   public static final String QUALITY_CHECKER_PREFIX = "qualitychecker";
+  public static final String TASK_LEVEL_POLICY_RESULT_KEY = 
"gobblin.task.level.policy.result";
   public static final String TASK_LEVEL_POLICY_LIST = QUALITY_CHECKER_PREFIX + 
".task.policies";
   public static final String TASK_LEVEL_POLICY_LIST_TYPE = 
QUALITY_CHECKER_PREFIX + ".task.policy.types";
   public static final String ROW_LEVEL_POLICY_LIST = QUALITY_CHECKER_PREFIX + 
".row.policies";
@@ -938,7 +942,7 @@ public class ConfigurationKeys {
   public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED 
= false;
 
   public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
-
+  public static final String METRICS_REPORTING_OPENTELEMETRY_FABRIC = 
METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX + "fabric";
   // Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON 
String with string keys and values
   public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = 
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
 
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
index d016adfe0a..b5043e01e8 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
@@ -22,7 +22,7 @@ import org.apache.gobblin.configuration.WorkUnitState;
 
 
 public abstract class TaskLevelPolicy {
-  private final State state;
+  protected final State state;
   private final Type type;
 
   public enum Type {
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
new file mode 100644
index 0000000000..d5f935d3bc
--- /dev/null
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    TransferBytes transferBytes = 
getBytesReadAndWritten(this.state).orElse(null);
+    if (transferBytes == null) {
+      return Result.FAILED;
+    }
+    Long bytesRead = transferBytes.getBytesRead();
+    Long bytesWritten = transferBytes.getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}", bytesRead, bytesWritten,
+        sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    TransferBytes transferBytes = 
getBytesReadAndWritten(this.state).orElse(null);
+    if (transferBytes != null) {
+      return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
transferBytes.getBytesRead(),
+          transferBytes.getBytesWritten());
+    } else {
+      return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+    }
+  }
+
+  /**
+   * Helper class to hold transfer bytes information
+   */
+  @Getter
+  private static class TransferBytes {
+    final long bytesRead;
+    final long bytesWritten;
+
+    TransferBytes(long bytesRead, long bytesWritten) {
+      this.bytesRead = bytesRead;
+      this.bytesWritten = bytesWritten;
+    }
+  }
+
+  /**
+   * Extracts bytesRead and bytesWritten from the given state.
+   * Returns Empty Optional if parsing fails.
+   */
+  private Optional<TransferBytes> getBytesReadAndWritten(State state) {
+    String bytesReadString = state.getProp(BYTES_READ_KEY);
+    String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+    if (bytesReadString == null || bytesWrittenString == null) {
+      log.error("Missing value(s): bytesReadStr={}, bytesWrittenStr={}", 
bytesReadString, bytesWrittenString);
+      return Optional.empty();
+    }
+    try {
+      long bytesRead = Long.parseLong(bytesReadString);
+      long bytesWritten = Long.parseLong(bytesWrittenString);
+      return Optional.of(new TransferBytes(bytesRead, bytesWritten));
+    } catch (NumberFormatException e) {
+      log.error("Invalid number format for bytesRead or bytesWritten: 
bytesRead='{}', bytesWritten='{}'",
+          bytesReadString, bytesWrittenString, e);
+      return Optional.empty();
+    }
+  }
+}
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java
index 5ca87b0ea0..7a5ad4c1d6 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.publisher;
 
+import java.util.Set;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
 import java.util.Map;
 
@@ -63,8 +64,8 @@ public class TaskPublisher {
    * Returns true if all tests from the PolicyChecker pass, false otherwise
    */
   public boolean passedAllTests() {
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
this.results.getPolicyResults().entrySet()) {
-      if (entry.getKey().equals(TaskLevelPolicy.Result.FAILED) && 
entry.getValue().equals(TaskLevelPolicy.Type.FAIL)) {
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
this.results.getPolicyResults().entrySet()) {
+      if (entry.getKey().equals(TaskLevelPolicy.Result.FAILED) && 
entry.getValue().contains(TaskLevelPolicy.Type.FAIL)) {
         return false;
       }
     }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java
new file mode 100644
index 0000000000..855beeb729
--- /dev/null
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An enumeration for possible statuses for Data quality checks.
+ * Its values will be:
+ * - PASSED: When all data quality checks pass
+ * - FAILED: When any data quality check fails
+ * - NOT_EVALUATED: When data quality check evaluation is not performed
+ * - UNKNOWN: when the data quality status is not recognized or invalid
+ */
+@Slf4j
+public enum DataQualityStatus {
+  PASSED,
+  FAILED,
+  NOT_EVALUATED,
+  UNKNOWN;
+
+  public static DataQualityStatus fromString(String value) {
+    if (value == null) {
+      return NOT_EVALUATED;
+    }
+    try {
+      return DataQualityStatus.valueOf(value.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      log.error("Invalid DataQualityStatus value: {}. Returning UNKNOWN.", 
value, e);
+      return UNKNOWN;
+    }
+  }
+}
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
index 6233f3891f..3094b523d6 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
@@ -19,19 +19,20 @@ package org.apache.gobblin.qualitychecker.task;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 
 /**
  * Wrapper around a Map of PolicyResults and Policy.Type
  */
 public class TaskLevelPolicyCheckResults {
-  private final Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> results;
+  private final Map<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> results;
 
   public TaskLevelPolicyCheckResults() {
     this.results = new HashMap<>();
   }
 
-  public Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> getPolicyResults() {
+  public Map<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> 
getPolicyResults() {
     return this.results;
   }
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
index 909248a4df..a2c19d2fae 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
@@ -17,8 +17,9 @@
 
 package org.apache.gobblin.qualitychecker.task;
 
+import java.util.EnumSet;
 import java.util.List;
-
+import lombok.Getter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,30 +29,25 @@ import org.slf4j.LoggerFactory;
  * executes each one, and then stores the output
  * in a PolicyCheckResults object
  */
+@Getter
 public class TaskLevelPolicyChecker {
-  /**
-   * An enumeration for possible statuses for Data quality checks,
-   * its values will be PASSED, FAILED, in case if data quality check
-   * evaluation is not performed for Job, it will be NOT_EVALUATED
-   */
-  public enum DataQualityStatus {
-    PASSED,
-    FAILED,
-    NOT_EVALUATED
-  }
   private final List<TaskLevelPolicy> list;
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
 
+  public static final String TASK_LEVEL_POLICY_RESULT_KEY = 
"gobblin.task.level.policy.result";
+
   public TaskLevelPolicyChecker(List<TaskLevelPolicy> list) {
     this.list = list;
   }
 
   public TaskLevelPolicyCheckResults executePolicies() {
     TaskLevelPolicyCheckResults results = new TaskLevelPolicyCheckResults();
+
     for (TaskLevelPolicy p : this.list) {
       TaskLevelPolicy.Result result = p.executePolicy();
-      results.getPolicyResults().put(result, p.getType());
-      LOG.info("TaskLevelPolicy " + p + " of type " + p.getType() + " executed 
with result " + result);
+      results.getPolicyResults().computeIfAbsent(result, r -> 
EnumSet.noneOf(TaskLevelPolicy.Type.class))
+          .add(p.getType());
+      LOG.info("TaskLevelPolicy {} of type {} executed with result {}", p, 
p.getType(), result);
     }
     return results;
   }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
new file mode 100644
index 0000000000..0129891748
--- /dev/null
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FileSizePolicyTest {
+
+  @Test
+  public void testPolicyPass() {
+    State state = new State();
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.PASSED);
+  }
+
+  @Test
+  public void testPolicyFail() {
+    State state = new State();
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+  }
+
+  @Test
+  public void testMissingProperties() {
+    State state = new State();
+    // No properties set at all
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+  }
+
+  @Test
+  public void testPartiallySetProperties() {
+    State state = new State();
+    // Only set bytes read, not bytes written
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+
+    // Reset state and only set bytes written, not bytes read
+    state = new State();
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+    policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+  }
+
+}
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/FailingTaskLevelPolicy.java
similarity index 63%
copy from 
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
copy to 
gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/FailingTaskLevelPolicy.java
index 6233f3891f..d0e907197b 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/FailingTaskLevelPolicy.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.qualitychecker.task;
+package org.apache.gobblin.qualitychecker;
 
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
 
 
-/**
- * Wrapper around a Map of PolicyResults and Policy.Type
- */
-public class TaskLevelPolicyCheckResults {
-  private final Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> results;
-
-  public TaskLevelPolicyCheckResults() {
-    this.results = new HashMap<>();
+public class FailingTaskLevelPolicy extends TaskLevelPolicy {
+  public FailingTaskLevelPolicy(State state, Type type) {
+    super(state, type);
   }
 
-  public Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> getPolicyResults() {
-    return this.results;
+  @Override
+  public Result executePolicy() {
+    return Result.FAILED;
   }
 }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
index 43736fa318..d986e1687b 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.qualitychecker;
 
+import java.util.Set;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
 import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
@@ -46,7 +47,7 @@ public class RowCountTaskLevelPolicyTest {
     state.setProp(ConfigurationKeys.WRITER_ROWS_WRITTEN, WRITER_ROWS_WRITTEN);
 
     TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
       Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
     }
   }
@@ -61,7 +62,7 @@ public class RowCountTaskLevelPolicyTest {
     state.setProp(ConfigurationKeys.WRITER_ROWS_WRITTEN, -1);
 
     TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
       Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
     }
   }
@@ -77,7 +78,7 @@ public class RowCountTaskLevelPolicyTest {
     state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
 
     TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
       Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
     }
   }
@@ -93,7 +94,7 @@ public class RowCountTaskLevelPolicyTest {
     state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
 
     TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
       Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
     }
   }
@@ -109,7 +110,7 @@ public class RowCountTaskLevelPolicyTest {
     state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
 
     TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
       Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
     }
   }
@@ -126,7 +127,7 @@ public class RowCountTaskLevelPolicyTest {
     state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
 
     TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
       Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
     }
   }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TaskLevelQualityCheckerTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TaskLevelQualityCheckerTest.java
deleted file mode 100644
index c366faa10d..0000000000
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TaskLevelQualityCheckerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.qualitychecker;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
-import 
org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckerBuilderFactory;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.configuration.State;
-
-
-@Test(groups = {"gobblin.qualitychecker"})
-public class TaskLevelQualityCheckerTest {
-
-  @Test
-  public void testPolicyChecker()
-      throws Exception {
-    State state = new State();
-    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST, 
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
-    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL");
-
-    TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
-      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
-    }
-  }
-
-  @Test
-  public void testMultiplePolicies()
-      throws Exception {
-    State state = new State();
-    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
-        
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy,org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
-    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL,FAIL");
-
-    TaskLevelPolicyCheckResults results = getPolicyResults(state);
-    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
-      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
-    }
-  }
-
-  private TaskLevelPolicyCheckResults getPolicyResults(State state)
-      throws Exception {
-    TaskLevelPolicyChecker checker =
-        new 
TaskLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, 
-1).build();
-    return checker.executePolicies();
-  }
-}
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelQualityCheckerTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelQualityCheckerTest.java
new file mode 100644
index 0000000000..484a851787
--- /dev/null
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelQualityCheckerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker.task;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(groups = {"gobblin.qualitychecker"})
+public class TaskLevelQualityCheckerTest {
+
+  @Test
+  public void testPolicyChecker()
+      throws Exception {
+    State state = new State();
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST, 
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL");
+
+    TaskLevelPolicyCheckResults results = getPolicyResults(state);
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
+    }
+  }
+
+  @Test
+  public void testMultiplePoliciesAllPass()
+      throws Exception {
+    State state = new State();
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+        
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy,org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL,FAIL");
+
+    TaskLevelPolicyCheckResults results = getPolicyResults(state);
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
+    }
+  }
+
+  @Test
+  public void testMultiplePoliciesAllFail()
+      throws Exception {
+    State state = new State();
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+        
"org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy");
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL,FAIL");
+
+    TaskLevelPolicyCheckResults results = getPolicyResults(state);
+    for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+    }
+  }
+
+  @Test
+  public void testMultiplePoliciesAllFailWithDifferentPolicyTypes()
+      throws Exception {
+    State state = new State();
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+        
"org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy");
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, 
"OPTIONAL,FAIL");
+
+    TaskLevelPolicyCheckResults results = getPolicyResults(state);
+    
Assert.assertEquals(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).size(),
 2);
+  }
+
+  @Test
+  public void testMultiplePoliciesFailAndPassWithDifferentPolicyTypes()
+      throws Exception {
+    State state = new State();
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+        
"org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
+    state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, 
"OPTIONAL,FAIL,FAIL");
+
+    TaskLevelPolicyCheckResults results = getPolicyResults(state);
+    
Assert.assertEquals(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).size(),
 2);
+    
Assert.assertEquals(results.getPolicyResults().get(TaskLevelPolicy.Result.PASSED).size(),
 1);
+    
Assert.assertTrue(results.getPolicyResults().get(TaskLevelPolicy.Result.PASSED).contains(TaskLevelPolicy.Type.FAIL));
+    
Assert.assertTrue(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).contains(TaskLevelPolicy.Type.FAIL));
+    
Assert.assertTrue(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).contains(TaskLevelPolicy.Type.OPTIONAL));
+  }
+
+  private TaskLevelPolicyCheckResults getPolicyResults(State state)
+      throws Exception {
+    TaskLevelPolicyChecker checker =
+        new 
TaskLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, 
-1).build();
+    return checker.executePolicies();
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 4413ce5032..0eb50a4607 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -62,6 +62,7 @@ import 
org.apache.gobblin.data.management.copy.FileAwareInputStream;
 import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
 import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter;
+import org.apache.gobblin.policies.size.FileSizePolicy;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.FinalState;
@@ -212,6 +213,21 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     this.filesWritten.incrementAndGet();
   }
 
+  /**
+   * Records the actual file size in the state after writing.
+   * @param writeAt The path where the file was written
+   * @throws IOException if there is an error getting the file status
+   */
+  private void recordBytesWritten(Path writeAt) throws IOException {
+    try {
+      long actualFileSize = this.fs.getFileStatus(writeAt).getLen();
+      this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, actualFileSize);
+    } catch (IOException e) {
+      log.error("Failed to get file status for {} to record bytes written", 
writeAt, e);
+      throw e;
+    }
+  }
+
   /**
    * Write the contents of input stream into staging path.
    *
@@ -236,6 +252,9 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     final long blockSize = copyableFile.getBlockSize(this.fs);
     final long fileSize = copyableFile.getFileStatus().getLen();
 
+    // Store source file size in task state
+    this.state.setProp(FileSizePolicy.BYTES_READ_KEY, fileSize);
+
     long expectedBytes = fileSize;
     Long maxBytes = null;
     // Whether writer must write EXACTLY maxBytes.
@@ -308,6 +327,7 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         os.close();
         log.info("OutputStream for file {} is closed.", writeAt);
         inputStream.close();
+        recordBytesWritten(writeAt);
       }
     }
   }
@@ -457,7 +477,6 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   @Override
   public void commit()
       throws IOException {
-
     if (!this.actualProcessedCopyableFile.isPresent()) {
       return;
     }
@@ -470,7 +489,6 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     log.info(String.format("Committing data from %s to %s", stagingFilePath, 
outputFilePath));
     try {
       setFilePermissions(copyableFile);
-
       Iterator<OwnerAndPermission> ancestorOwnerAndPermissionIt =
           copyableFile.getAncestorsOwnerAndPermission() == null ? 
Collections.emptyIterator()
               : copyableFile.getAncestorsOwnerAndPermission().listIterator();
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriter.java
new file mode 100644
index 0000000000..b2ffe69830
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer.test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to 
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *
+ * The writer actually writes the correct data to the destination, but reports 
incorrect sizes in the bytesWritten() method.
+ * The size discrepancy can be configured through properties:
+ * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by 
(default 1.0)
+ * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size 
(default 0)
+ */
+@Slf4j
+public class IncorrectSizeFileAwareInputStreamDataWriter extends 
FileAwareInputStreamDataWriter {
+
+  public static final String INCORRECT_SIZE_RATIO_KEY = 
CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio";
+  public static final String INCORRECT_SIZE_OFFSET_KEY = 
CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset";
+  public static final double DEFAULT_INCORRECT_SIZE_RATIO = 0.9; // Default to 
90% of actual size
+  public static final long DEFAULT_INCORRECT_SIZE_OFFSET = 0L;
+
+  private final double sizeRatio;
+  private final long sizeOffset;
+
+  public IncorrectSizeFileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId)
+      throws IOException {
+    this(state, numBranches, branchId, null);
+  }
+
+  public IncorrectSizeFileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId, String writerAttemptId)
+      throws IOException {
+    super(state, numBranches, branchId, writerAttemptId);
+    this.sizeRatio = state.getPropAsDouble(INCORRECT_SIZE_RATIO_KEY, 
DEFAULT_INCORRECT_SIZE_RATIO);
+    if (this.sizeRatio <= 0 || this.sizeRatio > 1) {
+      throw new IllegalArgumentException("Incorrect size ratio must be in the 
range (0, 1]: " + this.sizeRatio);
+    }
+    this.sizeOffset = state.getPropAsLong(INCORRECT_SIZE_OFFSET_KEY, 
DEFAULT_INCORRECT_SIZE_OFFSET);
+    log.info("Initialized IncorrectSizeFileAwareInputStreamDataWriter with 
ratio={}, offset={}",
+        this.sizeRatio, this.sizeOffset);
+  }
+
+  @Override
+  protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile 
copyableFile,
+      FileAwareInputStream record) throws IOException {
+    // First call parent's writeImpl to do the actual writing
+    super.writeImpl(inputStream, writeAt, copyableFile, record);
+
+    // After writing is complete, update the state with actual and incorrect 
sizes
+    long actualDestSize = this.fs.getFileStatus(writeAt).getLen();
+    long incorrectBytes = (long)(actualDestSize * this.sizeRatio) + 
this.sizeOffset;
+
+    this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, incorrectBytes);
+
+    log.info("File size reporting: actual={}, reported={} (ratio={}, 
offset={})",
+        actualDestSize, incorrectBytes, this.sizeRatio, this.sizeOffset);
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java
new file mode 100644
index 0000000000..98d8da6680
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer.test;
+
+import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+
+/**
+ * Builder for {@link IncorrectSizeFileAwareInputStreamDataWriter}.
+ */
+public class IncorrectSizeFileAwareInputStreamDataWriterBuilder extends 
DataWriterBuilder<String, FileAwareInputStream> {
+
+  @Override
+  public final DataWriter<FileAwareInputStream> build() throws IOException {
+    setJobSpecificOutputPaths(this.destination.getProperties());
+    
this.destination.getProperties().setProp(ConfigurationKeys.WRITER_FILE_PATH, 
this.writerId);
+    return buildWriter();
+  }
+
+  protected DataWriter<FileAwareInputStream> buildWriter() throws IOException {
+    return new 
IncorrectSizeFileAwareInputStreamDataWriter(this.destination.getProperties(), 
this.branches, this.branch, this.writerAttemptId);
+  }
+
+  /**
+   * Each job gets its own task-staging and task-output directory. Update the 
staging and output directories to
+   * contain job_id. This is to make sure uncleaned data from previous 
execution does not corrupt final published data
+   * produced by this execution.
+   */
+  public synchronized static void setJobSpecificOutputPaths(State state) {
+    if 
(!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
+        state.getProp(ConfigurationKeys.JOB_ID_KEY))) {
+
+      state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
+          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
+      state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
+    }
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index dc79b8aaed..e53d1eae2f 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.gobblin.policies.size.FileSizePolicy;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -632,6 +633,47 @@ public class FileAwareInputStreamDataWriterTest {
         streamString1);
   }
 
+  @Test
+  public void testStateUpdatesAfterWrite() throws Exception {
+    // Create test data
+    String testContent = "testContentsForStateUpdate";
+    byte[] contentBytes = testContent.getBytes(StandardCharsets.UTF_8);
+    long expectedSourceSize = contentBytes.length;
+
+    FileStatus status = fs.getFileStatus(testTempPath);
+    OwnerAndPermission ownerAndPermission =
+        new OwnerAndPermission(status.getOwner(), status.getGroup(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    CopyableFile cf = 
CopyableFileUtils.getTestCopyableFile(expectedSourceSize, ownerAndPermission);
+    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+
+    WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, 
"staging").toString());
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output").toString());
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
+    CopySource.serializeCopyEntity(state, cf);
+    CopySource.serializeCopyableDataset(state, metadata);
+
+    FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
+    FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
+        .file(cf)
+        .inputStream(new ByteArrayInputStream(contentBytes))
+        .build();
+
+    // Write data and verify state updates
+    dataWriter.write(fileAwareInputStream);
+
+    // Verify source file size in state
+    String sourceSizeKey = FileSizePolicy.BYTES_READ_KEY;
+    Assert.assertEquals(state.getPropAsLong(sourceSizeKey), expectedSourceSize,
+        "Source file size should be recorded in state");
+
+    // Verify bytes written in state
+    String destinationSizeKey = FileSizePolicy.BYTES_WRITTEN_KEY;
+    Assert.assertEquals(state.getPropAsLong(destinationSizeKey), 
expectedSourceSize,
+        "Bytes written should match source file size");
+
+  }
+
   @AfterClass
   public void cleanup() {
     try {
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 154facfe55..879410c05d 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -83,6 +83,9 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     public static final String FLOW_GROUP_FIELD = "flowGroup";
     public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId";
     public static final String FLOW_EDGE_FIELD = "flowEdge";
+    public static final String FLOW_FABRIC = "fabric";
+    public static final String FLOW_SOURCE = "source";
+    public static final String FLOW_DESTINATION = "destination";
     public static final String FLOW_MODIFICATION_TIME_FIELD = 
"flowModificationTime";
     public static final String JOB_NAME_FIELD = "jobName";
     public static final String JOB_GROUP_FIELD = "jobGroup";
@@ -115,6 +118,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
   public static final String JOB_LAST_PROGRESS_EVENT_TIME = 
"jobLastProgressEventTime";
   public static final String JOB_COMPLETION_PERCENTAGE = 
"jobCompletionPercentage";
   public static final String DATASET_TASK_SUMMARIES = "datasetTaskSummaries";
+  public static final String DATASET_URN = "datasetUrn";
 
   @Getter
   private Long startTime;
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 021ce11cf0..57e95508b6 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -22,6 +22,12 @@ public class ServiceMetricNames {
   public static final String GOBBLIN_SERVICE_PREFIX = "GobblinService";
   public static final String GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER = 
GOBBLIN_SERVICE_PREFIX + ".";
   public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
+  public static final String DATA_QUALITY_JOB_SUCCESS_COUNT = 
"dataQualitySuccessCount";
+  public static final String DATA_QUALITY_JOB_FAILURE_COUNT = 
"dataQualityFailureCount";
+  public static final String DATA_QUALITY_OVERALL_FILE_COUNT = 
"dataQualityOverallFileCount";
+  public static final String DATA_QUALITY_SUCCESS_FILE_COUNT = 
"dataQualitySuccessFileCount";
+  public static final String DATA_QUALITY_FAILURE_FILE_COUNT = 
"dataQualityFailureFileCount";
+  public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT = 
"dataQualityNonEvaluatedFileCount";
 
   // Flow Compilation Meters and Timer
   public static final String FLOW_COMPILATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
new file mode 100644
index 0000000000..959f57053c
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+
+import java.util.List;
+import java.util.Properties;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+  private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+  // Private constructor to prevent instantiation
+  private DataQualityEvaluator() {
+  }
+
+  /**
+   * Result of a data quality evaluation containing the overall status and 
metrics.
+   */
+  @Getter
+  @AllArgsConstructor
+  public static class DataQualityEvaluationResult {
+    private final DataQualityStatus qualityStatus;
+    // Total number of files evaluated for data quality
+    private final int totalFiles;
+    // Number of files that passed data quality checks
+    private final int passedFiles;
+    // Number of files that failed data quality checks
+    private final int failedFiles;
+    // Number of files that were not evaluated for data quality for example 
files not found or not processed
+    private final int nonEvaluatedFiles;
+  }
+
+  /**
+   * Evaluates the data quality of a dataset state and stores the result.
+   * This method is specifically designed for dataset-level quality evaluation.
+   *
+   * @param datasetState The dataset state to evaluate and update
+   * @param jobState The job state containing additional context
+   * @return DataQualityEvaluationResult containing the evaluation results
+   */
+  public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState,
+      JobState jobState) {
+    List<TaskState> taskStates = datasetState.getTaskStates();
+    DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+    // Store the result in the dataset state
+    jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+    // Emit dataset-specific metrics
+    emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), 
result.getPassedFiles(),
+        result.getFailedFiles(), result.getNonEvaluatedFiles(), 
datasetState.getDatasetUrn());
+
+    return result;
+  }
+
+  /**
+   * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+   *
+   * @param taskStates List of task states to evaluate
+   * @param jobState The job state containing additional context
+   * @return DataQualityEvaluationResult containing the evaluation results
+   */
+  public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+    DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+    int totalFiles = 0;
+    int failedFilesCount = 0;
+    int passedFilesCount = 0;
+    int nonEvaluatedFilesCount = 0;
+
+    for (TaskState taskState : taskStates) {
+      totalFiles++;
+
+      // Handle null task states gracefully
+      if (taskState == null) {
+        log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+        nonEvaluatedFilesCount++;
+        continue;
+      }
+
+      DataQualityStatus taskDataQuality = null;
+      String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+      taskDataQuality = DataQualityStatus.fromString(result);
+      if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+        log.debug("Data quality status of this task is: " + taskDataQuality);
+        if (DataQualityStatus.PASSED == taskDataQuality) {
+          passedFilesCount++;
+        } else if (DataQualityStatus.FAILED == taskDataQuality) {
+          failedFilesCount++;
+          jobDataQualityStatus = DataQualityStatus.FAILED;
+        } else {
+          log.warn("Unexpected data quality status: " + taskDataQuality + " 
for task: " + taskState.getTaskId());
+        }
+      } else {
+        // Handle files without data quality evaluation
+        nonEvaluatedFilesCount++;
+        log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+      }
+    }
+
+    // Log summary of evaluation
+    log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: 
{}, Not Evaluated: {}", totalFiles,
+        passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, 
passedFilesCount, failedFilesCount,
+        nonEvaluatedFilesCount);
+  }
+
+  private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+      final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount,
+      final String datasetUrn) {
+    try {
+      // Check if OpenTelemetry is enabled
+      boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+          ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+      if (!otelEnabled) {
+        log.info("OpenTelemetry metrics disabled, skipping metrics emission");
+        return;
+      }
+
+      OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+      if (otelMetrics == null) {
+        log.warn("OpenTelemetry metrics instance is null, skipping metrics 
emission");
+        return;
+      }
+      log.info("OpenTelemetry instance obtained");
+
+      Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+      Attributes tags = getTagsForDataQualityMetrics(jobState, datasetUrn);
+      log.info("Emitting DQ metrics for job={}, status={}, tags={}", 
jobState.getJobName(), jobDataQuality, tags);
+      String jobMetricDescription = "Number of Jobs with data quality status: 
" + jobDataQuality;
+      String jobMetricName =
+          (jobDataQuality == DataQualityStatus.PASSED) ? 
ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT
+              : ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT;
+      log.info("Data quality status for job: {} is {}", jobState.getJobName(), 
jobDataQuality);
+      
meter.counterBuilder(jobMetricName).setDescription(jobMetricDescription).build().add(1,
 tags);
+
+      // Emit overall files count
+      meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+          .setDescription("Number of files evaluated for data 
quality").build().add(totalFiles, tags);
+
+      // Emit passed files count
+      meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+          .setDescription("Number of files that passed data 
quality").build().add(passedFilesCount, tags);
+
+      // Emit failed files count
+      meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+          .setDescription("Number of files that failed data 
quality").build().add(failedFilesCount, tags);
+
+      // Emit non-evaluated files count
+      
meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_NON_EVALUATED_FILE_COUNT)
+          .setDescription("Number of files that did not have data quality 
evaluation").build()
+          .add(nonEvaluatedFilesCount, tags);
+    } catch (Exception e) {
+      log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
+    }
+    log.info("Completed emitMetrics for job: {}", jobState.getJobName());
+  }
+
+  private static Attributes getTagsForDataQualityMetrics(JobState jobState, 
String datasetUrn) {
+    Properties jobProperties = jobState.getProperties();
+    log.info("Job properties loaded: " + jobProperties);
+
+    return 
Attributes.builder().put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobState.getJobName())
+        .put(TimingEvent.DATASET_URN, datasetUrn)
+        .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY))
+        .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+            jobProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY))
+        .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+            jobProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
+        .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
+            jobProperties.getProperty(ConfigurationKeys.FLOW_EDGE_ID_KEY))
+        .put(TimingEvent.FlowEventConstants.FLOW_FABRIC,
+            
jobProperties.getProperty(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_FABRIC,
 null))
+        .put(TimingEvent.FlowEventConstants.FLOW_SOURCE,
+            
jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))
+        .put(TimingEvent.FlowEventConstants.FLOW_DESTINATION,
+            
jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
""))
+        .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
+            jobProperties.getProperty(ConfigurationKeys.FLOW_SPEC_EXECUTOR, 
"")).build();
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index ca6b80bd2f..7acda0884d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -752,12 +752,12 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         }
         LOG.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: 
%d)", datasetState.getDatasetUrn(), totalRecordsWritten, totalBytesWritten));
         datasetTaskSummaries.add(new 
DatasetTaskSummary(datasetState.getDatasetUrn(), totalRecordsWritten, 
totalBytesWritten,
-            datasetState.getState() == JobState.RunningState.COMMITTED));
+            datasetState.getState() == JobState.RunningState.COMMITTED, 
jobState.getDataQualityStatus().name()));
       } else if (datasetState.getState() == JobState.RunningState.FAILED) {
         // Check if config is turned on for submitting writer metrics on 
failure due to non-atomic write semantics
         if (this.jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
           LOG.info("Due to task failure, will report that no records or bytes 
were written for " + datasetState.getDatasetUrn());
-          datasetTaskSummaries.add(new 
DatasetTaskSummary(datasetState.getDatasetUrn(), 0, 0, false));
+          datasetTaskSummaries.add(new 
DatasetTaskSummary(datasetState.getDatasetUrn(), 0, 0, false, 
jobState.getDataQualityStatus().name()));
         }
       }
     }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 1aebbc2f86..f8c1d2a1e0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -24,9 +24,8 @@ import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 
+import lombok.ToString;
 import org.apache.gobblin.metrics.DatasetMetric;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
-
 
 /**
  * A class returned by {@link org.apache.gobblin.runtime.SafeDatasetCommit} to 
provide metrics for the dataset
@@ -35,18 +34,19 @@ import 
org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
 @Data
 @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @RequiredArgsConstructor
-@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@NoArgsConstructor
+@ToString
 public class DatasetTaskSummary {
   @NonNull private String datasetUrn;
   @NonNull private long recordsWritten;
   @NonNull private long bytesWritten;
   @NonNull private boolean successfullyCommitted;
+  @NonNull private String dataQualityStatus;
 
   /**
    * Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
    */
   public static DatasetMetric toDatasetMetric(DatasetTaskSummary 
datasetTaskSummary) {
-    return new DatasetMetric(datasetTaskSummary.getDatasetUrn(), 
datasetTaskSummary.getBytesWritten(), datasetTaskSummary.getRecordsWritten(), 
datasetTaskSummary.isSuccessfullyCommitted(),
-        TaskLevelPolicyChecker.DataQualityStatus.NOT_EVALUATED.name());
+    return new DatasetMetric(datasetTaskSummary.getDatasetUrn(), 
datasetTaskSummary.getBytesWritten(), datasetTaskSummary.getRecordsWritten(), 
datasetTaskSummary.isSuccessfullyCommitted(), 
datasetTaskSummary.getDataQualityStatus());
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
index cf53d560ac..911cb0aec7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
@@ -26,6 +26,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
 
 
 /**
@@ -48,9 +49,12 @@ public class ForkThrowableHolder {
     return throwables.isEmpty();
   }
 
-  public ForkException getAggregatedException (List<Integer> failedForkIds, 
String taskId) {
+  public ForkException getAggregatedException (List<Integer> failedForkIds, 
String taskId, DataQualityStatus taskDataQuality) {
     StringBuffer stringBuffer = new StringBuffer();
     stringBuffer.append("Fork branches " + failedForkIds + " failed for task " 
+ taskId + "\n");
+    if(DataQualityStatus.FAILED == taskDataQuality){
+      stringBuffer.append("DataQuality failed for task: " + taskId + "\n");
+    }
     for (Integer idx: failedForkIds) {
       stringBuffer.append("<Fork " + idx + ">\n");
       if (this.throwables.containsKey(idx)) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 3f6dca4a92..e591ad6859 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -516,7 +516,7 @@ public class JobContext implements Closeable {
       DeliverySemantics deliverySemantics, String datasetUrn, 
JobState.DatasetState datasetState,
       boolean isMultithreaded, JobContext jobContext) {
     return new SafeDatasetCommit(shouldCommitDataInJob, isJobCancelled, 
deliverySemantics, datasetUrn, datasetState,
-        isMultithreaded, jobContext);
+        isMultithreaded, jobContext, SafeDatasetCommit.COMMIT_SRC_JOB_CONTEXT);
   }
 
   protected Map<String, JobState.DatasetState> computeDatasetStatesByUrns() {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index d61777f833..8d1bc890da 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -30,25 +30,15 @@ import java.util.Properties;
 import lombok.Getter;
 import lombok.Setter;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.runtime.job.JobProgress;
+import org.apache.gobblin.runtime.util.MetricGroup;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.io.Text;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.stream.JsonWriter;
-import com.linkedin.data.template.StringMap;
+import org.apache.gobblin.source.workunit.WorkUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
@@ -63,13 +53,24 @@ import org.apache.gobblin.rest.MetricTypeEnum;
 import org.apache.gobblin.rest.TaskExecutionInfoArray;
 import org.apache.gobblin.runtime.api.MonitoredObject;
 import org.apache.gobblin.runtime.util.JobMetrics;
-import org.apache.gobblin.runtime.util.MetricGroup;
-import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.data.template.StringMap;
+import org.apache.hadoop.io.Text;
 import org.apache.gobblin.util.ImmutableProperties;
 import org.apache.gobblin.util.JobLauncherUtils;
 
-
 /**
  * A class for tracking job state information.
  *
@@ -94,6 +95,8 @@ public class JobState extends SourceState implements 
JobProgress {
    *    <li> SUCCESSFUL => CANCELLED  (cancelled before committing)
    * </ul>
    */
+  public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX = 
"GaaSJobObservabilityEventProducer.";
+  public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
   public enum RunningState implements MonitoredObject {
     /** Pending creation of {@link WorkUnit}s. */
     PENDING,
@@ -588,6 +591,15 @@ public class JobState extends SourceState implements 
JobProgress {
         .name("completed tasks").value(this.getCompletedTasks());
   }
 
+  /**
+   * Gets the overall data quality status of the job.
+   * @return "PASSED" if all tasks passed their quality checks, "FAILED" 
otherwise
+   */
+  public DataQualityStatus getDataQualityStatus() {
+    return 
DataQualityStatus.fromString(super.getProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY));
+  }
+
+
   protected void propsToJson(JsonWriter jsonWriter)
       throws IOException {
     jsonWriter.beginObject();
@@ -756,6 +768,7 @@ public class JobState extends SourceState implements 
JobProgress {
    *   and {@link #setProp(String, Object)} are not supported.
    * </p>
    */
+  @Slf4j
   public static class DatasetState extends JobState {
 
     // For serialization/deserialization
@@ -794,6 +807,7 @@ public class JobState extends SourceState implements 
JobProgress {
       jsonWriter.beginObject();
       
jsonWriter.name(ConfigurationKeys.DATASET_URN_KEY).value(getDatasetUrn());
       
jsonWriter.name(ConfigurationKeys.JOB_FAILURES_KEY).value(getJobFailures());
+      
jsonWriter.name(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY).value(getDataQualityStatus().name());
       jsonWriter.endObject();
     }
 
@@ -831,7 +845,10 @@ public class JobState extends SourceState implements 
JobProgress {
     protected void writeStateSummary(JsonWriter jsonWriter)
         throws IOException {
       super.writeStateSummary(jsonWriter);
+      
jsonWriter.name(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY).value(getDataQualityStatus().name());
       jsonWriter.name("datasetUrn").value(getDatasetUrn());
     }
+
+
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 70a9d9a96e..de355e47d0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -54,6 +54,7 @@ import 
org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
 import org.apache.gobblin.runtime.task.TaskFactory;
 import org.apache.gobblin.runtime.task.TaskUtils;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.quality.DataQualityEvaluator;
 
 
 /**
@@ -69,6 +70,8 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
 
   private static final String DATASET_STATE = "datasetState";
   private static final String FAILED_DATASET_EVENT = "failedDataset";
+  public static final String COMMIT_SRC_JOB_CONTEXT = "JobContext";
+  public static final String COMMIT_SRC_COMMIT_ACTIVITY_IMPL = 
"CommitActivityImpl";
 
   private final boolean shouldCommitDataInJob;
   private final boolean isJobCancelled;
@@ -77,6 +80,7 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
   private final JobState.DatasetState datasetState;
   private final boolean isMultithreaded;
   private final JobContext jobContext;
+  private final String datasetCommitSrc;
 
   private MetricContext metricContext;
 
@@ -90,6 +94,14 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
     metricContext = Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class);
 
     finalizeDatasetStateBeforeCommit(this.datasetState);
+    // evaluate data quality at the dataset commit level, only when commit 
source is CommitActivityImpl
+    if 
(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc))
 {
+      log.info("Evaluating data quality for commit activity for dataset {}.", 
this.datasetUrn);
+       evaluateAndEmitDatasetQuality();
+    } else {
+      log.info("Skipping data quality evaluation for dataset {} as commit 
source is {}", this.datasetUrn,
+          this.datasetCommitSrc);
+    }
     Class<? extends DataPublisher> dataPublisherClass;
     try (Closer closer = Closer.create()) {
       dataPublisherClass = 
JobContext.getJobDataPublisherClass(this.jobContext.getJobState())
@@ -438,4 +450,14 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
         .withDatasetState(datasetState).build());
   }
 
+  /**
+   * Evaluates and stores the data quality status for the dataset.
+   * This method handles the business logic of data quality evaluation
+   * at the dataset commit level, which is more appropriate than having
+   * it in the JobState data container.
+   */
+  private void evaluateAndEmitDatasetQuality() {
+    DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState, 
this.jobContext.getJobState());
+  }
+
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 7ddb4f11f2..2dd2e83f5d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -32,6 +32,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.BooleanUtils;
+import org.mortbay.log.Log;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -96,6 +97,7 @@ import org.apache.gobblin.writer.TrackerBasedWatermarkManager;
 import org.apache.gobblin.writer.WatermarkAwareWriter;
 import org.apache.gobblin.writer.WatermarkManager;
 import org.apache.gobblin.writer.WatermarkStorage;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
 
 
 /**
@@ -316,6 +318,32 @@ public class Task implements TaskIFace {
     this.shutdownLatch.countDown();
   }
 
+  private void computeAndUpdateTaskDataQuality() {
+    DataQualityStatus overallTaskDataQuality = DataQualityStatus.PASSED;
+    for (Optional<Fork> fork : this.forks.keySet()) {
+      if (fork.isPresent()) {
+        TaskState forkTaskState = fork.get().getTaskState();
+        if (forkTaskState != null) {
+          String result = 
forkTaskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+          DataQualityStatus forkDataQualityStatus = null;
+          try {
+            if (result != null) {
+              forkDataQualityStatus = DataQualityStatus.valueOf(result);
+            }
+          } catch (IllegalArgumentException e) {
+            Log.warn("Unknown data quality status encountered: " + result);
+            forkDataQualityStatus = DataQualityStatus.UNKNOWN;
+          }
+          if (DataQualityStatus.FAILED == forkDataQualityStatus) {
+            overallTaskDataQuality = DataQualityStatus.FAILED;
+          }
+        }
+      }
+    }
+    LOG.info("Data quality state of the task is {}", overallTaskDataQuality);
+    this.taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
overallTaskDataQuality.name());
+  }
+
   private boolean shutdownRequested() {
     if (!this.shutdownRequested.get()) {
       this.shutdownRequested.set(Thread.currentThread().isInterrupted());
@@ -385,7 +413,7 @@ public class Task implements TaskIFace {
           if (failedForksId.size() == 1 && 
holder.getThrowable(failedForksId.get(0)).isPresent()) {
             e = holder.getThrowable(failedForksId.get(0)).get();
           }else{
-            e = holder.getAggregatedException(failedForksId, this.taskId);
+            e = holder.getAggregatedException(failedForksId, this.taskId, 
null);
           }
         }
         throw e == null ? new RuntimeException("Some forks failed") : e;
@@ -927,7 +955,7 @@ public class Task implements TaskIFace {
           }
         }
       }
-
+      this.computeAndUpdateTaskDataQuality();
       if (failedForkIds.size() == 0) {
         // Set the task state to SUCCESSFUL. The state is not set to COMMITTED
         // as the data publisher will do that upon successful data publishing.
@@ -942,7 +970,14 @@ public class Task implements TaskIFace {
           if (failedForkIds.size() == 1 && 
holder.getThrowable(failedForkIds.get(0)).isPresent()) {
             failTask(holder.getThrowable(failedForkIds.get(0)).get());
           } else {
-            failTask(holder.getAggregatedException(failedForkIds, 
this.taskId));
+            String taskDataQualityString = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            DataQualityStatus dataQualityStatus = 
DataQualityStatus.NOT_EVALUATED;
+            if (taskDataQualityString != null) {
+              dataQualityStatus = 
DataQualityStatus.valueOf(taskDataQualityString);
+            } else {
+              Log.warn("Task data quality status is not set, defaulting to 
NOT_EVALUATED for taskId: {}", this.taskId);
+            }
+            failTask(holder.getAggregatedException(failedForkIds, this.taskId, 
dataQualityStatus));
           }
         } else {
           // just in case there are some corner cases where Fork throw an 
exception but doesn't add into holder
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index ed83c0d538..0ff7cde453 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -21,6 +21,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -352,6 +354,20 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
     return parentTaskDone;
   }
 
+
+  /**
+   * Get the {@link TaskState} associated with this {@link Fork}.
+   *
+   * <p>
+   *   The {@link TaskState} contains information about the execution state of 
the Fork {@link Fork},
+   *   including metrics and other runtime data. This method allows access to 
the {@link TaskState}
+   *   for monitoring or reporting purposes.
+   * </p>
+   *
+   * @return the {@link TaskState} of the {@link Fork}.
+   */
+  public TaskState getTaskState() { return this.forkTaskState; }
+
   /**
    * Update record-level metrics.
    */
@@ -377,7 +393,8 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
    */
   public boolean commit() {
     try {
-      if (checkDataQuality(this.convertedSchema)) {
+      boolean dataQualityEnforced = 
this.forkTaskState.getPropAsBoolean(ConfigurationKeys.ENFORCE_DATA_QUALITY_FAILURE_KEY,
 ConfigurationKeys.DEFAULT_ENFORCE_DATA_QUALITY_FAILURE);
+      if (checkDataQuality(this.convertedSchema) || !dataQualityEnforced) {
         // Commit data if all quality checkers pass. Again, not to catch the 
exception
         // it may throw so the exception gets propagated to the caller of this 
method.
         this.logger.info(String.format("Committing data for fork %d of task 
%s", this.index, this.taskId));
@@ -613,6 +630,11 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
       TaskLevelPolicyCheckResults taskResults =
           this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, 
this.branches > 1 ? this.index : -1)
               .executePolicies();
+      boolean hasFailureForMandatoryPolicy = taskResults.getPolicyResults()
+          .getOrDefault(TaskLevelPolicy.Result.FAILED, 
java.util.Collections.emptySet())
+          .contains(TaskLevelPolicy.Type.FAIL);
+      forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
+          hasFailureForMandatoryPolicy ? DataQualityStatus.FAILED.name() : 
DataQualityStatus.PASSED.name());
       TaskPublisher publisher = 
this.taskContext.getTaskPublisher(this.forkTaskState, taskResults);
       switch (publisher.canPublish()) {
         case SUCCESS:
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
index b2e3e8b09b..4a6ebee5e9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
@@ -89,6 +89,7 @@ public class LocalTaskStateTracker extends 
AbstractTaskStateTracker {
     try {
       // Check the task state and handle task retry if task failed and
       // it has not reached the maximum number of retries
+
       WorkUnitState.WorkingState state = task.getTaskState().getWorkingState();
       if (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < 
this.maxTaskRetries) {
         this.taskExecutor.retry(task);
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/quality/DataQualityEvaluatorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/quality/DataQualityEvaluatorTest.java
new file mode 100644
index 0000000000..32bf9658c7
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/quality/DataQualityEvaluatorTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Unit tests for {@link DataQualityEvaluator}
+ */
+@Test(groups = {"gobblin.quality"})
+public class DataQualityEvaluatorTest {
+
+  @Test
+  public void testDataQualityEvaluationWithAllPassedTasks() {
+    // Create job state with properties
+    JobState jobState = new JobState("TestJob", "TestJob-1");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
"TestFlow");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
"TestGroup");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
"12345");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "edge1");
+    jobState.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, 
"test-instance");
+    jobState.setProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
"test-executor");
+    jobState.setProp(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, 
"test-source");
+    jobState.setProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
"test-destination");
+
+    // Create task states with PASSED quality status
+    List<TaskState> taskStates = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      TaskState taskState = new TaskState();
+      taskState.setTaskId("Task-" + i);
+      taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.PASSED.name());
+      taskStates.add(taskState);
+    }
+
+    // Evaluate data quality
+    DataQualityEvaluator.DataQualityEvaluationResult result =
+        DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+    // Verify results
+    Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED);
+    Assert.assertEquals(result.getTotalFiles(), 3);
+    Assert.assertEquals(result.getPassedFiles(), 3);
+    Assert.assertEquals(result.getFailedFiles(), 0);
+    Assert.assertEquals(result.getNonEvaluatedFiles(), 0);
+  }
+
+  @Test
+  public void testDataQualityEvaluationWithMixedResults() {
+    // Create job state
+    JobState jobState = new JobState("TestJob", "TestJob-1");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
"TestFlow");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
"TestGroup");
+
+    // Create task states with mixed quality status
+    List<TaskState> taskStates = new ArrayList<>();
+
+    // Passed task
+    TaskState passedTask = new TaskState();
+    passedTask.setTaskId("Task-1");
+    passedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.PASSED.name());
+    taskStates.add(passedTask);
+
+    // Failed task
+    TaskState failedTask = new TaskState();
+    failedTask.setTaskId("Task-2");
+    failedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.FAILED.name());
+    taskStates.add(failedTask);
+
+    // Not evaluated task
+    TaskState notEvaluatedTask = new TaskState();
+    notEvaluatedTask.setTaskId("Task-3");
+    notEvaluatedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.NOT_EVALUATED.name());
+    taskStates.add(notEvaluatedTask);
+
+    // Evaluate data quality
+    DataQualityEvaluator.DataQualityEvaluationResult result =
+        DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+    // Verify results
+    Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.FAILED);
+    Assert.assertEquals(result.getTotalFiles(), 3);
+    Assert.assertEquals(result.getPassedFiles(), 1);
+    Assert.assertEquals(result.getFailedFiles(), 1);
+    Assert.assertEquals(result.getNonEvaluatedFiles(), 1);
+  }
+
+  @Test
+  public void testJobPropertiesRetrieval() {
+    // Create job state with various properties
+    JobState jobState = new JobState("TestJob", "TestJob-1");
+    jobState.setProp("test.property1", "value1");
+    jobState.setProp("test.property2", "value2");
+    jobState.setProp(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, 
"test-source");
+    jobState.setProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
"test-destination");
+
+    // Get properties directly from JobState
+    Properties jobProperties = jobState.getProperties();
+
+    // Verify properties are correctly retrieved
+    Assert.assertEquals(jobProperties.getProperty("test.property1"), "value1");
+    Assert.assertEquals(jobProperties.getProperty("test.property2"), "value2");
+    
Assert.assertEquals(jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY),
 "test-source");
+    
Assert.assertEquals(jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY),
+        "test-destination");
+  }
+
+  @Test
+  public void testDatasetQualityEvaluation() {
+    // Create job state
+    JobState jobState = new JobState("TestJob", "TestJob-1");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
"TestFlow");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
"TestGroup");
+
+    // Create dataset state
+    JobState.DatasetState datasetState = new JobState.DatasetState("TestJob", 
"TestJob-1");
+    datasetState.setDatasetUrn("test://dataset/urn");
+
+    // Create task states
+    List<TaskState> taskStates = new ArrayList<>();
+    TaskState taskState = new TaskState();
+    taskState.setTaskId("Task-1");
+    taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.PASSED.name());
+    taskStates.add(taskState);
+
+    // Add task states to dataset state
+    for (TaskState ts : taskStates) {
+      datasetState.addTaskState(ts);
+    }
+    // Evaluate dataset quality
+    DataQualityEvaluator.DataQualityEvaluationResult result =
+        DataQualityEvaluator.evaluateAndReportDatasetQuality(datasetState, 
jobState);
+
+    // Verify results
+    Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED);
+    Assert.assertEquals(result.getTotalFiles(), 1);
+    Assert.assertEquals(result.getPassedFiles(), 1);
+    Assert.assertEquals(result.getFailedFiles(), 0);
+    Assert.assertEquals(result.getNonEvaluatedFiles(), 0);
+
+    // Verify dataset state was updated
+    //Assert.assertEquals(datasetState.getDataQualityStatus(), 
DataQualityStatus.PASSED);
+  }
+
+  @Test
+  public void testDataQualityEvaluationWithNullTaskStates() {
+    // Create job state
+    JobState jobState = new JobState("TestJob", "TestJob-1");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
"TestFlow");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
"TestGroup");
+
+    // Create task states with some null entries
+    List<TaskState> taskStates = new ArrayList<>();
+
+    // Valid passed task
+    TaskState passedTask = new TaskState();
+    passedTask.setTaskId("Task-1");
+    passedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.PASSED.name());
+    taskStates.add(passedTask);
+
+    // Null task state
+    taskStates.add(null);
+
+    // Valid failed task
+    TaskState failedTask = new TaskState();
+    failedTask.setTaskId("Task-3");
+    failedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.FAILED.name());
+    taskStates.add(failedTask);
+
+    // Another null task state
+    taskStates.add(null);
+
+    // Valid not evaluated task
+    TaskState notEvaluatedTask = new TaskState();
+    notEvaluatedTask.setTaskId("Task-5");
+    notEvaluatedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, 
DataQualityStatus.NOT_EVALUATED.name());
+    taskStates.add(notEvaluatedTask);
+
+    // Evaluate data quality
+    DataQualityEvaluator.DataQualityEvaluationResult result =
+        DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+    // Verify results - should handle null task states gracefully
+    // The method should process only non-null task states
+    Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.FAILED);
+    Assert.assertEquals(result.getTotalFiles(), 5); // Total includes null 
entries
+    Assert.assertEquals(result.getPassedFiles(), 1);
+    Assert.assertEquals(result.getFailedFiles(), 1);
+    Assert.assertEquals(result.getNonEvaluatedFiles(), 3);
+  }
+
+  @Test
+  public void testDataQualityEvaluationWithAllNullTaskStates() {
+    // Create job state
+    JobState jobState = new JobState("TestJob", "TestJob-1");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
"TestFlow");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
"TestGroup");
+
+    // Create list with all null task states
+    List<TaskState> taskStates = new ArrayList<>();
+    taskStates.add(null);
+    taskStates.add(null);
+    taskStates.add(null);
+
+    // Evaluate data quality
+    DataQualityEvaluator.DataQualityEvaluationResult result =
+        DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+    // Verify results - should handle all null task states gracefully
+    Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED); 
// Default status when no failures
+    Assert.assertEquals(result.getTotalFiles(), 3);
+    Assert.assertEquals(result.getPassedFiles(), 0);
+    Assert.assertEquals(result.getFailedFiles(), 0);
+    Assert.assertEquals(result.getNonEvaluatedFiles(), 3);
+  }
+
+  @Test
+  public void testDataQualityEvaluationWithEmptyTaskStatesList() {
+    // Create job state
+    JobState jobState = new JobState("TestJob", "TestJob-1");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
"TestFlow");
+    jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
"TestGroup");
+
+    // Create empty task states list
+    List<TaskState> taskStates = new ArrayList<>();
+
+    // Evaluate data quality
+    DataQualityEvaluator.DataQualityEvaluationResult result =
+        DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+    // Verify results - should handle empty list gracefully
+    Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED); 
// Default status when no tasks
+    Assert.assertEquals(result.getTotalFiles(), 0);
+    Assert.assertEquals(result.getPassedFiles(), 0);
+    Assert.assertEquals(result.getFailedFiles(), 0);
+    Assert.assertEquals(result.getNonEvaluatedFiles(), 0);
+  }
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java
index 8bcca84b27..b4dcd1960e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java
@@ -194,7 +194,8 @@ public class JobStateTest {
     Assert.assertEquals(jobExecutionInfo.getJobProperties().get("foo"), "bar");
 
     List<String> taskStateIds = Lists.newArrayList();
-    for (TaskExecutionInfo taskExecutionInfo : 
jobExecutionInfo.getTaskExecutions()) {
+    for (int i = 0; i < jobExecutionInfo.getTaskExecutions().size(); i++) {
+      TaskExecutionInfo taskExecutionInfo = 
jobExecutionInfo.getTaskExecutions().get(i);
       Assert.assertEquals(taskExecutionInfo.getJobId(), "TestJob-1");
       Assert.assertEquals(taskExecutionInfo.getStartTime().longValue(), 
this.startTime);
       Assert.assertEquals(taskExecutionInfo.getEndTime().longValue(), 
this.startTime + 1000);
@@ -207,4 +208,5 @@ public class JobStateTest {
     Collections.sort(taskStateIds);
     Assert.assertEquals(taskStateIds, Lists.newArrayList("TestTask-0", 
"TestTask-1", "TestTask-2"));
   }
+
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index 2cbde03039..366a1408f7 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -76,8 +76,8 @@ public class GaaSJobObservabilityProducerTest {
         createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
     );
     List<DatasetTaskSummary> summaries = new ArrayList<>();
-    DatasetTaskSummary dataset1 = new DatasetTaskSummary("/testFolder", 100, 
1000, true);
-    DatasetTaskSummary dataset2 = new DatasetTaskSummary("/testFolder2", 1000, 
10000, false);
+    DatasetTaskSummary dataset1 = new DatasetTaskSummary("/testFolder", 100, 
1000, true, "PASSED");
+    DatasetTaskSummary dataset2 = new DatasetTaskSummary("/testFolder2", 1000, 
10000, false, "PASSED");
     summaries.add(dataset1);
     summaries.add(dataset2);
     Properties jobProps = new Properties();
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index b106cdc153..ba44dc6198 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -145,7 +145,7 @@ public class CommitActivityImpl implements CommitActivity {
 
       boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
 
-      Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), 
shouldIncludeFailedTasks);
+      Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext, 
shouldIncludeFailedTasks);
       return new CommitStats(
           datasetTaskSummaries,
           
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
@@ -197,7 +197,7 @@ public class CommitActivityImpl implements CommitActivity {
                 @Override
                 public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry) {
                   return new SafeDatasetCommit(shouldCommitDataInJob, false, 
deliverySemantics, entry.getKey(),
-                      entry.getValue(), false, jobContext);
+                      entry.getValue(), false, jobContext, 
SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL);
                 }
               }).iterator(), numCommitThreads,
           // TODO: Rewrite executorUtils to use java util optional
@@ -249,8 +249,10 @@ public class CommitActivityImpl implements CommitActivity {
     });
   }
 
-  private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, 
JobState.DatasetState> datasetStatesByUrns, JobCommitPolicy commitPolicy, 
boolean shouldIncludeFailedTasks) {
+  private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, 
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext, boolean 
shouldIncludeFailedTasks) {
     Map<String, DatasetStats> datasetTaskStats = new HashMap<>();
+    JobCommitPolicy commitPolicy = jobContext.getJobCommitPolicy();
+    JobState jobState = jobContext.getJobState();
     // Only process successful datasets unless configuration to process failed 
datasets is set
     for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
       if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
@@ -270,11 +272,11 @@ public class CommitActivityImpl implements CommitActivity 
{
         }
         log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: 
%d)", datasetState.getDatasetUrn(),
             totalRecordsWritten, totalBytesWritten));
-        datasetTaskStats.put(datasetState.getDatasetUrn(), new 
DatasetStats(totalRecordsWritten, totalBytesWritten, true, 
totalCommittedTasks));
+        datasetTaskStats.put(datasetState.getDatasetUrn(), new 
DatasetStats(totalRecordsWritten, totalBytesWritten, true, totalCommittedTasks, 
jobState.getDataQualityStatus().name()));
       } else if (datasetState.getState() == JobState.RunningState.FAILED && 
commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
         // Check if config is turned on for submitting writer metrics on 
failure due to non-atomic write semantics
         log.info("Due to task failure, will report that no records or bytes 
were written for " + datasetState.getDatasetUrn());
-        datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats( 
0, 0, false, 0));
+        datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats( 
0, 0, false, 0, jobState.getDataQualityStatus().name()));
       }
     }
     return datasetTaskStats;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
index e6a0d9e417..f2a854fa7f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -37,4 +37,5 @@ public class DatasetStats {
   @NonNull private long bytesWritten;
   @NonNull private boolean successfullyCommitted;
   @NonNull private int numCommittedWorkunits;
+  @NonNull private String dataQualityCheckStatus;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 5b5658b910..56f645fc49 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -61,8 +61,9 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   private List<DatasetTaskSummary> 
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
     List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
     for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
-      datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), 
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), 
entry.getValue().isSuccessfullyCommitted()));
+      datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), 
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), 
entry.getValue().isSuccessfullyCommitted(), 
entry.getValue().getDataQualityCheckStatus()));
     }
+    log.info("Converted dataset stats to task summaries: {}", 
datasetTaskSummaries);
     return datasetTaskSummaries;
   }
 }

Reply via email to