[ 
https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=975539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975539
 ]

ASF GitHub Bot logged work on GOBBLIN-2204:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jul/25 20:57
            Start Date: 21/Jul/25 20:57
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2220106656


##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    TransferBytes bytes = getBytesReadAndWritten(this.state);
+    if (bytes == null) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.getBytesRead();
+    Long bytesWritten = bytes.getBytesWritten();
+
+    if(bytesRead == null || bytesWritten == null) {

Review Comment:
   nit: add space after `if` 



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    TransferBytes bytes = getBytesReadAndWritten(this.state);
+    if (bytes == null) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.getBytesRead();
+    Long bytesWritten = bytes.getBytesWritten();
+
+    if(bytesRead == null || bytesWritten == null) {
+      log.error("Missing value(s): bytesRead={}, bytesWritten={}", bytesRead, 
bytesWritten);
+      return Result.FAILED;
+    }
+    Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    log.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        bytesRead, bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    TransferBytes bytes = getBytesReadAndWritten(this.state);
+    return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", 
bytes.getBytesRead(), bytes.getBytesWritten());
+  }
+
+  /**
+   * Helper class to hold transfer bytes information
+   */
+  @Getter
+  private static class TransferBytes {
+    final Long bytesRead;
+    final Long bytesWritten;
+    TransferBytes(Long bytesRead, Long bytesWritten) {
+      this.bytesRead = bytesRead;
+      this.bytesWritten = bytesWritten;
+    }
+  }
+
+  /**
+   * Extracts bytesRead and bytesWritten from the given state.
+   * Returns null if parsing fails.
+   */
+  private TransferBytes getBytesReadAndWritten(State state) {

Review Comment:
   We can return `Optional<TransferBytes>` from this instead of relying on null 
or constructing a partially invalid object. Optional clearly expresses the 
possibility of absence and avoids null-filled objects like `TransferBytes(null, 
null)`, making the failure path more explicit for the caller



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesSize = 0;

Review Comment:
   rename `faliedFilesCount -> failedFilesCount` to better reflect that these 
are counts and match naming in metrics emission



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesSize = 0;
+        int passedFilesSize = 0;
+        int nonEvaluatedFilesSize = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesSize++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesSize++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesSize++;
+                    jobDataQuality = DataQualityStatus.FAILED;
+                }
+            } else {
+                // Handle files without data quality evaluation
+                nonEvaluatedFilesSize++;
+                log.warn("No data quality evaluation for task: " + 
taskState.getTaskId());
+            }
+        }
+
+        // Log summary of evaluation
+        log.info("Data quality evaluation summary - Total: {}, Passed: {}, 
Failed: {}, Not Evaluated: {}",
+            totalFiles, passedFilesSize, failedFilesSize, 
nonEvaluatedFilesSize);
+        return new DataQualityEvaluationResult(jobDataQuality, totalFiles, 
passedFilesSize, failedFilesSize, nonEvaluatedFilesSize);
+    }
+
+    private static void emitMetrics(JobState jobState, int jobDataQuality, int 
totalFiles,
+            int passedFilesSize, int failedFilesSize, int 
nonEvaluatedFilesSize, String datasetUrn) {
+        OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+        if(otelMetrics != null) {
+            Meter meter = 
otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+            AtomicLong jobDataQualityRef = new AtomicLong(jobDataQuality);
+            AtomicLong totalFilesRef = new AtomicLong(totalFiles);
+            AtomicLong passedFilesRef = new AtomicLong(passedFilesSize);
+            AtomicLong failedFilesRef = new AtomicLong(failedFilesSize);
+            AtomicLong nonEvaluatedFilesRef = new 
AtomicLong(nonEvaluatedFilesSize);

Review Comment:
   Any reason to use `AtomicLong`? It should not be needed since the metric 
values are immutable snapshots and not being mutated or shared across threads



##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An enumeration for possible statuses for Data quality checks.
+ * Its values will be:
+ * - PASSED: When all data quality checks pass
+ * - FAILED: When any data quality check fails
+ * - NOT_EVALUATED: When data quality check evaluation is not performed

Review Comment:
   add javadoc for `UNKNOWN`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),

Review Comment:
   `DATA_QUALITY_STATUS_METRIC_NAME` is defined as a counter, but adding 0 for 
failures doesn’t emit anything, so we lose visibility into failed flows 
completely. To fix this, please split into two explicit counters: one for 
success and one for failure to ensure both outcomes are observable and can be 
used to setup alerts.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesSize = 0;
+        int passedFilesSize = 0;
+        int nonEvaluatedFilesSize = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesSize++;

Review Comment:
   when is `taskStatus: null`.. should that be counted as DQ failure status?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesSize = 0;
+        int passedFilesSize = 0;
+        int nonEvaluatedFilesSize = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesSize++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesSize++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesSize++;
+                    jobDataQuality = DataQualityStatus.FAILED;
+                }
+            } else {

Review Comment:
   in what all cases would taskDataQuality be `NOT_EVALUATED`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+        int totalFiles = 0;
+        int failedFilesSize = 0;
+        int passedFilesSize = 0;
+        int nonEvaluatedFilesSize = 0;
+
+        for (TaskState taskState : taskStates) {
+            totalFiles++;
+
+            // Handle null task states gracefully
+            if (taskState == null) {
+                log.warn("Encountered null task state, skipping data quality 
evaluation for this task");
+                nonEvaluatedFilesSize++;
+                continue;
+            }
+
+            DataQualityStatus taskDataQuality = null;
+            String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+            taskDataQuality = DataQualityStatus.fromString(result);
+            if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+                log.debug("Data quality status of this task is: " + 
taskDataQuality);
+                if (DataQualityStatus.PASSED == taskDataQuality) {
+                    passedFilesSize++;
+                } else if (DataQualityStatus.FAILED == taskDataQuality){
+                    failedFilesSize++;
+                    jobDataQuality = DataQualityStatus.FAILED;
+                }

Review Comment:
   add a warn log when DataQualityStatus.fromString() returns UNKNOWN



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+  }
+
+  @Override
+  public Result executePolicy() {
+    TransferBytes bytes = getBytesReadAndWritten(this.state);
+    if (bytes == null) {
+      return Result.FAILED;
+    }
+    Long bytesRead = bytes.getBytesRead();
+    Long bytesWritten = bytes.getBytesWritten();
+
+    if(bytesRead == null || bytesWritten == null) {

Review Comment:
   We should avoid `new TransferBytes(null, null)`, as it creates a seemingly 
valid object that requires downstream null checks. We can instead log any error 
within `getBytesReadAndWritten` and return an `Optional.empty()` for any 
failure case.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),

Review Comment:
   The inline condition (status == PASSED ? 1 : 0) leaks metric encoding logic 
at the caller. We can move this into the DataQualityStatus enum (eg. 
toMetricValue()), or encapsulating it inside emitMetrics()



##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+    private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = 
"gobblin.gaas.observability";
+
+    // Private constructor to prevent instantiation
+    private DataQualityEvaluator() {}
+
+    /**
+     * Result of a data quality evaluation containing the overall status and 
metrics.
+     */
+    @Getter
+    public static class DataQualityEvaluationResult {
+        private final DataQualityStatus qualityStatus;
+        private final int totalFiles;
+        private final int passedFiles;
+        private final int failedFiles;
+        // Number of files that were not evaluated for data quality for 
example files not found or not processed
+        private final int nonEvaluatedFiles;
+
+        public DataQualityEvaluationResult(DataQualityStatus qualityStatus, 
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+            this.qualityStatus = qualityStatus;
+            this.totalFiles = totalFiles;
+            this.passedFiles = passedFiles;
+            this.failedFiles = failedFiles;
+            this.nonEvaluatedFiles = nonEvaluatedFiles;
+        }
+    }
+
+    /**
+     * Evaluates the data quality of a dataset state and stores the result.
+     * This method is specifically designed for dataset-level quality 
evaluation.
+     *
+     * @param datasetState The dataset state to evaluate and update
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState 
jobState) {
+        List<TaskState> taskStates = datasetState.getTaskStates();
+        DataQualityEvaluationResult result = evaluateDataQuality(taskStates, 
jobState);
+
+        // Store the result in the dataset state
+        jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
result.getQualityStatus().name());
+        // Emit dataset-specific metrics
+        emitMetrics(jobState, result.getQualityStatus() == 
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+            result.getPassedFiles(), result.getFailedFiles(), 
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+        return result;
+    }
+
+    /**
+     * Evaluates the data quality of a set of task states and emits relevant 
metrics.
+     *
+     * @param taskStates List of task states to evaluate
+     * @param jobState The job state containing additional context
+     * @return DataQualityEvaluationResult containing the evaluation results
+     */
+    public static DataQualityEvaluationResult 
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+        DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;

Review Comment:
   should we rename the variable to `jobDataQualityStatus`?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 975539)
    Time Spent: 1h  (was: 50m)

> FileSize Data Quality implementation for FileBasedCopy
> ------------------------------------------------------
>
>                 Key: GOBBLIN-2204
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2204
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vaibhav Singhal
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to