khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2243350309


##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {
+      return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+    } else{
+      return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+    }
+  }
+
+  /**
+   * Helper class to hold transfer bytes information
+   */
+  @Getter
+  private static class TransferBytes {
+    final Long bytesRead;
+    final Long bytesWritten;
+    TransferBytes(Long bytesRead, Long bytesWritten) {
+      this.bytesRead = bytesRead;
+      this.bytesWritten = bytesWritten;
+    }
+  }
+
+  /**
+   * Extracts bytesRead and bytesWritten from the given state.
+   * Returns null if parsing fails.

Review Comment:
   update javadoc wrt returning null



##########
gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker.task;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.TestTaskLevelPolicy;
+
+@Test
+public class TaskLevelPolicyCheckerTest {
+
+  @Test
+  public void testSinglePolicyPassed() {
+    // Create a state with a single policy that always passes
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL));
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 1);
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
+      Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL);
+    }
+  }
+
+  @Test
+  public void testSinglePolicyFailed() {
+    // Create a state with a single policy that always fails
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new FailingTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL));
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 1);
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+      Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL);
+    }
+  }
+
+  @Test
+  public void testMultiplePoliciesMixedResults() {
+    // Create a state with multiple policies having mixed results
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL)); 
// Passes
+    policies.add(new FailingTaskLevelPolicy(state, 
TaskLevelPolicy.Type.FAIL)); // Fails
+    policies.add(new TestTaskLevelPolicy(state, 
TaskLevelPolicy.Type.OPTIONAL)); // Passes
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 2);
+    int passedCount = 0;
+    int failedCount = 0;
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      if (entry.getKey() == TaskLevelPolicy.Result.PASSED) {
+        passedCount++;
+      } else {
+        failedCount++;
+      }
+    }
+    Assert.assertEquals(passedCount, 1);
+    Assert.assertEquals(failedCount, 1);
+  }
+
+  @Test
+  public void testOptionalPolicyFailure() {
+    // Create a state with an optional policy that fails
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new FailingTaskLevelPolicy(state, 
TaskLevelPolicy.Type.OPTIONAL));
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 1);
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+      Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.OPTIONAL);
+    }
+  }
+
+  // Helper class for testing failing policies
+  private static class FailingTaskLevelPolicy extends TaskLevelPolicy {
+    public FailingTaskLevelPolicy(State state, Type type) {
+      super(state, type);
+    }
+
+    @Override
+    public Result executePolicy() {
+      return Result.FAILED;
+    }
+  }
+}

Review Comment:
   nit: add end of line



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }

Review Comment:
   this can be replaced with `@AllArgsConstructor` annotation on class



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}

Review Comment:
   indentation in this file is not right. Can you please fix it to 2 spaces for 
indentation



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {
+      return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+    } else{
+      return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+    }
+  }
+
+  /**
+   * Helper class to hold transfer bytes information
+   */
+  @Getter
+  private static class TransferBytes {
+    final Long bytesRead;
+    final Long bytesWritten;

Review Comment:
   since we are already validating and parsing as long, using boxed Long is not 
required. Using primitives guarantees non-null and avoids unnecessary autoboxing



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {

Review Comment:
   `TransferBytes transferBytes = 
getBytesReadAndWritten(this.state).orElse(null);`



##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java:
##########
@@ -28,20 +29,13 @@
  * executes each one, and then stores the output
  * in a PolicyCheckResults object
  */
+@Getter
 public class TaskLevelPolicyChecker {
-  /**
-   * An enumeration for possible statuses for Data quality checks,
-   * its values will be PASSED, FAILED, in case if data quality check
-   * evaluation is not performed for Job, it will be NOT_EVALUATED
-   */
-  public enum DataQualityStatus {
-    PASSED,
-    FAILED,
-    NOT_EVALUATED
-  }
   private final List<TaskLevelPolicy> list;
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
 
+  public static final String TASK_LEVEL_POLICY_RESULT_KEY = 
"gobblin.task.level.policy.result";

Review Comment:
   if there are multiple policies and with as type `optional` & other as 
`fail`, then the failure of `fail` policy would be masked and would cause 
silent success even when the required policy is failing
   `results.getPolicyResults().put(result, p.getType());`
   
   eg. if two policies return below:
   ('FAILURE' -> 'FAIL')
   (FAILURE' -> 'OPTIONAL')
   
   eventually we will see (FAILURE' -> 'OPTIONAL') and not consider this as DQ 
failure



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java:
##########
@@ -90,6 +94,14 @@ public Void call()
     metricContext = Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class);
 
     finalizeDatasetStateBeforeCommit(this.datasetState);
+    // evaluate data quality at the dataset commit level, only when commit 
source is CommitActivityImpl
+    
if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){
+      log.info("Evaluating data quality for commit activity for dataset {}.", 
this.datasetUrn);
+       evaluateAndEmitDatasetQuality();
+    } else {
+      log.warn("Skipping data quality evaluation for dataset {} as commit 
source is {}", this.datasetUrn,
+          this.datasetCommitSrc);

Review Comment:
   in what all cases, does it go in else? do we want to log this and if that's 
expected scenario, this can be `info` log



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }

Review Comment:
   the `bytes.isPresent() -> bytes.get()` pattern works but can be simplified 
as below:
   ```
   TransferBytes transferBytes = getBytesReadAndWritten(...).orElse(null);
   if (transferBytes == null) {
       return Result.FAILED;
   }
   long bytesRead = bytes.getBytesRead();
   ...
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java:
##########
@@ -90,6 +94,14 @@ public Void call()
     metricContext = Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class);
 
     finalizeDatasetStateBeforeCommit(this.datasetState);
+    // evaluate data quality at the dataset commit level, only when commit 
source is CommitActivityImpl
+    
if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){

Review Comment:
   `if {`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)
+                log.info("Data quality status for this job is " + 
jobDataQuality);
+                if (jobDataQuality == DataQualityStatus.PASSED) {
+                    log.info("Data quality passed for job: {}", 
jobState.getJobName());
+                    otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)

Review Comment:
   we can avoid repeated getMeter(...) calls and assign the meter to a local 
variable once and reuse it for all metrics:
   ```
   Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)
+                log.info("Data quality status for this job is " + 
jobDataQuality);
+                if (jobDataQuality == DataQualityStatus.PASSED) {
+                    log.info("Data quality passed for job: {}", 
jobState.getJobName());
+                    otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+                        
.counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT)
+                        .setDescription("Number of jobs that passed data 
quality")
+                        .build()
+                        .add(1, tags);
+                } else {
+                    log.info("Data quality failed for job: {}", 
jobState.getJobName());
+                    otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+                        
.counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT)

Review Comment:
   since if/else differs in metric name only, we can consider determining the 
metric name first and remove this if/else block using below:
   ```
   String jobMetricName = (jobDataQuality == DataQualityStatus.PASSED)
           ? ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT
           : ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT;
   ```



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {
+      return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+    } else{

Review Comment:
   nit: add space after `else {`.. similarly update at other places. Please 
refer to https://gobblin.apache.org/docs/developer-guide/CodingStyle/ to import 
the codestyle xml in IDE which automatically handles this



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)

Review Comment:
   we are emitting different metrics now.. this comment can be updated



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)
+                log.info("Data quality status for this job is " + 
jobDataQuality);

Review Comment:
   can combine multiple logging statements into one 
   ```
   log.info("Emitting DQ metrics for job={}, status={}, tags={}", 
jobState.getJobName(), jobDataQuality, tags);
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java:
##########
@@ -613,6 +629,11 @@ private boolean checkDataQuality(Optional<Object> schema)
       TaskLevelPolicyCheckResults taskResults =
           this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, 
this.branches > 1 ? this.index : -1)
               .executePolicies();
+      boolean allRequiredPoliciesPassed = 
taskResults.getPolicyResults().entrySet().stream()
+          .filter(e -> e.getValue() == TaskLevelPolicy.Type.FAIL)
+          .allMatch(e -> e.getKey() == TaskLevelPolicy.Result.PASSED);
+      forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
+          allRequiredPoliciesPassed ? DataQualityStatus.PASSED.name() : 
DataQualityStatus.FAILED.name());

Review Comment:
   we want to mark DQ status as failure if any policy fail, right? if yes, 
shouldn't we check that there is no `Result.FAILURE` key and that 
`Result.PASSED` is present and based on that, mark DQ status as SUCCESS



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {

Review Comment:
   we can check for `if (otelMetrics == null)` and return early. It also 
simplifies the rest of the method by reducing indentation and nesting, making 
the main logic easier to follow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to