[ 
https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=977040&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-977040
 ]

ASF GitHub Bot logged work on GOBBLIN-2204:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/25 05:19
            Start Date: 31/Jul/25 05:19
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2243350309


##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {
+      return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+    } else{
+      return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+    }
+  }
+
+  /**
+   * Helper class to hold transfer bytes information
+   */
+  @Getter
+  private static class TransferBytes {
+    final Long bytesRead;
+    final Long bytesWritten;
+    TransferBytes(Long bytesRead, Long bytesWritten) {
+      this.bytesRead = bytesRead;
+      this.bytesWritten = bytesWritten;
+    }
+  }
+
+  /**
+   * Extracts bytesRead and bytesWritten from the given state.
+   * Returns null if parsing fails.

Review Comment:
   update javadoc wrt returning null



##########
gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker.task;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.TestTaskLevelPolicy;
+
+@Test
+public class TaskLevelPolicyCheckerTest {
+
+  @Test
+  public void testSinglePolicyPassed() {
+    // Create a state with a single policy that always passes
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL));
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 1);
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
+      Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL);
+    }
+  }
+
+  @Test
+  public void testSinglePolicyFailed() {
+    // Create a state with a single policy that always fails
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new FailingTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL));
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 1);
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+      Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL);
+    }
+  }
+
+  @Test
+  public void testMultiplePoliciesMixedResults() {
+    // Create a state with multiple policies having mixed results
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL)); 
// Passes
+    policies.add(new FailingTaskLevelPolicy(state, 
TaskLevelPolicy.Type.FAIL)); // Fails
+    policies.add(new TestTaskLevelPolicy(state, 
TaskLevelPolicy.Type.OPTIONAL)); // Passes
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 2);
+    int passedCount = 0;
+    int failedCount = 0;
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      if (entry.getKey() == TaskLevelPolicy.Result.PASSED) {
+        passedCount++;
+      } else {
+        failedCount++;
+      }
+    }
+    Assert.assertEquals(passedCount, 1);
+    Assert.assertEquals(failedCount, 1);
+  }
+
+  @Test
+  public void testOptionalPolicyFailure() {
+    // Create a state with an optional policy that fails
+    State state = new State();
+    List<TaskLevelPolicy> policies = new ArrayList<>();
+    policies.add(new FailingTaskLevelPolicy(state, 
TaskLevelPolicy.Type.OPTIONAL));
+
+    // Create checker and execute policies
+    TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+    TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+    // Verify results
+    Assert.assertEquals(results.getPolicyResults().size(), 1);
+    for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : 
results.getPolicyResults().entrySet()) {
+      Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+      Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.OPTIONAL);
+    }
+  }
+
+  // Helper class for testing failing policies
+  private static class FailingTaskLevelPolicy extends TaskLevelPolicy {
+    public FailingTaskLevelPolicy(State state, Type type) {
+      super(state, type);
+    }
+
+    @Override
+    public Result executePolicy() {
+      return Result.FAILED;
+    }
+  }
+}

Review Comment:
   nit: add end of line



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }

Review Comment:
   this can be replaced with `@AllArgsConstructor` annotation on class



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}

Review Comment:
   indentation in this file is not right. Can you please fix it to 2 spaces for 
indentation



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {
+      return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+    } else{
+      return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+    }
+  }
+
+  /**
+   * Helper class to hold transfer bytes information
+   */
+  @Getter
+  private static class TransferBytes {
+    final Long bytesRead;
+    final Long bytesWritten;

Review Comment:
   since we are already validating and parsing as long, using boxed Long is not 
required. Using primitives guarantees non-null and avoids unnecessary autoboxing



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {

Review Comment:
   `TransferBytes transferBytes = 
getBytesReadAndWritten(this.state).orElse(null);`



##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java:
##########
@@ -28,20 +29,13 @@
  * executes each one, and then stores the output
  * in a PolicyCheckResults object
  */
+@Getter
 public class TaskLevelPolicyChecker {
-  /**
-   * An enumeration for possible statuses for Data quality checks,
-   * its values will be PASSED, FAILED, in case if data quality check
-   * evaluation is not performed for Job, it will be NOT_EVALUATED
-   */
-  public enum DataQualityStatus {
-    PASSED,
-    FAILED,
-    NOT_EVALUATED
-  }
   private final List<TaskLevelPolicy> list;
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
 
+  public static final String TASK_LEVEL_POLICY_RESULT_KEY = 
"gobblin.task.level.policy.result";

Review Comment:
   if there are multiple policies and with as type `optional` & other as 
`fail`, then the failure of `fail` policy would be masked and would cause 
silent success even when the required policy is failing
   `results.getPolicyResults().put(result, p.getType());`
   
   eg. if two policies return below:
   ('FAILURE' -> 'FAIL')
   (FAILURE' -> 'OPTIONAL')
   
   eventually we will see (FAILURE' -> 'OPTIONAL') and not consider this as DQ 
failure



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java:
##########
@@ -90,6 +94,14 @@ public Void call()
     metricContext = Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class);
 
     finalizeDatasetStateBeforeCommit(this.datasetState);
+    // evaluate data quality at the dataset commit level, only when commit 
source is CommitActivityImpl
+    
if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){
+      log.info("Evaluating data quality for commit activity for dataset {}.", 
this.datasetUrn);
+       evaluateAndEmitDatasetQuality();
+    } else {
+      log.warn("Skipping data quality evaluation for dataset {} as commit 
source is {}", this.datasetUrn,
+          this.datasetCommitSrc);

Review Comment:
   in what all cases, does it go in else? do we want to log this and if that's 
expected scenario, this can be `info` log



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }

Review Comment:
   the `bytes.isPresent() -> bytes.get()` pattern works but can be simplified 
as below:
   ```
   TransferBytes transferBytes = getBytesReadAndWritten(...).orElse(null);
   if (transferBytes == null) {
       return Result.FAILED;
   }
   long bytesRead = bytes.getBytesRead();
   ...
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java:
##########
@@ -90,6 +94,14 @@ public Void call()
     metricContext = Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class);
 
     finalizeDatasetStateBeforeCommit(this.datasetState);
+    // evaluate data quality at the dataset commit level, only when commit 
source is CommitActivityImpl
+    
if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){

Review Comment:
   `if {`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)
+                log.info("Data quality status for this job is " + 
jobDataQuality);
+                if (jobDataQuality == DataQualityStatus.PASSED) {
+                    log.info("Data quality passed for job: {}", 
jobState.getJobName());
+                    otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)

Review Comment:
   we can avoid repeated getMeter(...) calls and assign the meter to a local 
variable once and reuse it for all metrics:
   ```
   Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)
+                log.info("Data quality status for this job is " + 
jobDataQuality);
+                if (jobDataQuality == DataQualityStatus.PASSED) {
+                    log.info("Data quality passed for job: {}", 
jobState.getJobName());
+                    otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+                        
.counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT)
+                        .setDescription("Number of jobs that passed data 
quality")
+                        .build()
+                        .add(1, tags);
+                } else {
+                    log.info("Data quality failed for job: {}", 
jobState.getJobName());
+                    otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+                        
.counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT)

Review Comment:
   since if/else differs in metric name only, we can consider determining the 
metric name first and remove this if/else block using below:
   ```
   String jobMetricName = (jobDataQuality == DataQualityStatus.PASSED)
           ? ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT
           : ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT;
   ```



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if (!bytes.isPresent()) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.get().getBytesRead();
+    Long bytesWritten = bytes.get().getBytesWritten();
+
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+    if(bytes.isPresent()) {
+      return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+    } else{

Review Comment:
   nit: add space after `else {`.. similarly update at other places. Please 
refer to https://gobblin.apache.org/docs/developer-guide/CodingStyle/ to import 
the codestyle xml in IDE which automatically handles this



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)

Review Comment:
   we are emitting different metrics now.. this comment can be updated



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {
+                Attributes tags = getTagsForDataQualityMetrics(jobState, 
datasetUrn);
+                log.info("Tags for data quality metrics: " + tags.toString());
+                // Emit data quality status (1 for PASSED, 0 for FAILED)
+                log.info("Data quality status for this job is " + 
jobDataQuality);

Review Comment:
   can combine multiple logging statements into one 
   ```
   log.info("Emitting DQ metrics for job={}, status={}, tags={}", 
jobState.getJobName(), jobDataQuality, tags);
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java:
##########
@@ -613,6 +629,11 @@ private boolean checkDataQuality(Optional<Object> schema)
       TaskLevelPolicyCheckResults taskResults =
           this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, 
this.branches > 1 ? this.index : -1)
               .executePolicies();
+      boolean allRequiredPoliciesPassed = 
taskResults.getPolicyResults().entrySet().stream()
+          .filter(e -> e.getValue() == TaskLevelPolicy.Type.FAIL)
+          .allMatch(e -> e.getKey() == TaskLevelPolicy.Result.PASSED);
+      forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
+          allRequiredPoliciesPassed ? DataQualityStatus.PASSED.name() : 
DataQualityStatus.FAILED.name());

Review Comment:
   we want to mark DQ status as failure if any policy fail, right? if yes, 
shouldn't we check that there is no `Result.FAILURE` key and that 
`Result.PASSED` is present and based on that, mark DQ status as SUCCESS



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus(), 
result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesCount = 0;
+        int passedFilesCount = 0;
+        int nonEvaluatedFilesCount = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesCount++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesCount++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesCount++;
+                    jobDataQualityStatus = DataQualityStatus.FAILED;
+                } else {
+                    log.warn("Unexpected data quality status: " + 
taskDataQuality + " for task: " + taskState.getTaskId());
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesCount++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesCount, failedFilesCount, 
nonEvaluatedFilesCount);
+        return new DataQualityEvaluationResult(jobDataQualityStatus, 
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+    }
+
+
+    private static void emitMetrics(JobState jobState, final DataQualityStatus 
jobDataQuality, final int totalFiles,
+            final int passedFilesCount, final int failedFilesCount, final int 
nonEvaluatedFilesCount, final String datasetUrn) {
+        try {
+            // Check if OpenTelemetry is enabled
+            boolean otelEnabled = 
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+                
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+            if (!otelEnabled) {
+                log.info("OpenTelemetry metrics disabled, skipping metrics 
emission");
+                return;
+            }
+
+            OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+            log.info("OpenTelemetry instance obtained: {}", otelMetrics != 
null);
+
+            if (otelMetrics != null) {

Review Comment:
   we can check for `if (otelMetrics == null)` and return early. It also 
simplifies the rest of the method by reducing indentation and nesting, making 
the main logic easier to follow.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 977040)
    Time Spent: 1h 40m  (was: 1.5h)

> FileSize Data Quality implementation for FileBasedCopy
> ------------------------------------------------------
>
>                 Key: GOBBLIN-2204
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2204
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vaibhav Singhal
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to