[ https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=977040&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-977040 ]
ASF GitHub Bot logged work on GOBBLIN-2204: ------------------------------------------- Author: ASF GitHub Bot Created on: 31/Jul/25 05:19 Start Date: 31/Jul/25 05:19 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4113: URL: https://github.com/apache/gobblin/pull/4113#discussion_r2243350309 ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import java.util.Optional; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if (!bytes.isPresent()) { + return Result.FAILED; + } + Long bytesRead = bytes.get().getBytesRead(); + Long bytesWritten = bytes.get().getBytesWritten(); + + Long sizeDifference = Math.abs(bytesRead - bytesWritten); + + if (sizeDifference == 0) { + return Result.PASSED; + } + + log.warn("File size check failed - bytes read: {}, bytes written: {}, difference: {}", + bytesRead, bytesWritten, sizeDifference); + return Result.FAILED; + } + + @Override + public String toString() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if(bytes.isPresent()) { + return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", bytes.get().getBytesRead(), bytes.get().getBytesWritten()); + } else{ + return "FileSizePolicy [bytesRead=null, bytesWritten=null]"; + } + } + + /** + * Helper class to hold transfer bytes information + */ + @Getter + private static class TransferBytes { + final Long bytesRead; + final Long bytesWritten; + TransferBytes(Long bytesRead, Long bytesWritten) { + this.bytesRead = bytesRead; + this.bytesWritten = bytesWritten; + } + } + + /** + * Extracts bytesRead and bytesWritten from the given state. + * Returns null if parsing fails. Review Comment: update javadoc wrt returning null ########## gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckerTest.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.qualitychecker.task; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.TestTaskLevelPolicy; + +@Test +public class TaskLevelPolicyCheckerTest { + + @Test + public void testSinglePolicyPassed() { + // Create a state with a single policy that always passes + State state = new State(); + List<TaskLevelPolicy> policies = new ArrayList<>(); + policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL)); + + // Create checker and execute policies + TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies); + TaskLevelPolicyCheckResults results = checker.executePolicies(); + + // Verify results + Assert.assertEquals(results.getPolicyResults().size(), 1); + for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : results.getPolicyResults().entrySet()) { + Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED); + Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL); + } + } + + @Test + public void testSinglePolicyFailed() { + // Create a state with a single policy that always fails + State state = new State(); + List<TaskLevelPolicy> policies = new ArrayList<>(); + policies.add(new FailingTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL)); + + // Create checker and execute policies + TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies); + TaskLevelPolicyCheckResults results = checker.executePolicies(); + + // Verify results + Assert.assertEquals(results.getPolicyResults().size(), 1); + for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : results.getPolicyResults().entrySet()) { + Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED); + Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL); + } + } + + @Test + public void testMultiplePoliciesMixedResults() { + // Create a state with multiple policies having mixed results + State state = new State(); + List<TaskLevelPolicy> policies = new ArrayList<>(); + policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL)); // Passes + policies.add(new FailingTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL)); // Fails + policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.OPTIONAL)); // Passes + + // Create checker and execute policies + TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies); + TaskLevelPolicyCheckResults results = checker.executePolicies(); + + // Verify results + Assert.assertEquals(results.getPolicyResults().size(), 2); + int passedCount = 0; + int failedCount = 0; + for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : results.getPolicyResults().entrySet()) { + if (entry.getKey() == TaskLevelPolicy.Result.PASSED) { + passedCount++; + } else { + failedCount++; + } + } + Assert.assertEquals(passedCount, 1); + Assert.assertEquals(failedCount, 1); + } + + @Test + public void testOptionalPolicyFailure() { + // Create a state with an optional policy that fails + State state = new State(); + List<TaskLevelPolicy> policies = new ArrayList<>(); + policies.add(new FailingTaskLevelPolicy(state, TaskLevelPolicy.Type.OPTIONAL)); + + // Create checker and execute policies + TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies); + TaskLevelPolicyCheckResults results = checker.executePolicies(); + + // Verify results + Assert.assertEquals(results.getPolicyResults().size(), 1); + for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry : results.getPolicyResults().entrySet()) { + Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED); + Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.OPTIONAL); + } + } + + // Helper class for testing failing policies + private static class FailingTaskLevelPolicy extends TaskLevelPolicy { + public FailingTaskLevelPolicy(State state, Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + return Result.FAILED; + } + } +} Review Comment: nit: add end of line ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } Review Comment: this can be replaced with `@AllArgsConstructor` annotation on class ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} Review Comment: indentation in this file is not right. Can you please fix it to 2 spaces for indentation ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import java.util.Optional; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if (!bytes.isPresent()) { + return Result.FAILED; + } + Long bytesRead = bytes.get().getBytesRead(); + Long bytesWritten = bytes.get().getBytesWritten(); + + Long sizeDifference = Math.abs(bytesRead - bytesWritten); + + if (sizeDifference == 0) { + return Result.PASSED; + } + + log.warn("File size check failed - bytes read: {}, bytes written: {}, difference: {}", + bytesRead, bytesWritten, sizeDifference); + return Result.FAILED; + } + + @Override + public String toString() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if(bytes.isPresent()) { + return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", bytes.get().getBytesRead(), bytes.get().getBytesWritten()); + } else{ + return "FileSizePolicy [bytesRead=null, bytesWritten=null]"; + } + } + + /** + * Helper class to hold transfer bytes information + */ + @Getter + private static class TransferBytes { + final Long bytesRead; + final Long bytesWritten; Review Comment: since we are already validating and parsing as long, using boxed Long is not required. Using primitives guarantees non-null and avoids unnecessary autoboxing ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import java.util.Optional; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if (!bytes.isPresent()) { + return Result.FAILED; + } + Long bytesRead = bytes.get().getBytesRead(); + Long bytesWritten = bytes.get().getBytesWritten(); + + Long sizeDifference = Math.abs(bytesRead - bytesWritten); + + if (sizeDifference == 0) { + return Result.PASSED; + } + + log.warn("File size check failed - bytes read: {}, bytes written: {}, difference: {}", + bytesRead, bytesWritten, sizeDifference); + return Result.FAILED; + } + + @Override + public String toString() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if(bytes.isPresent()) { Review Comment: `TransferBytes transferBytes = getBytesReadAndWritten(this.state).orElse(null);` ########## gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java: ########## @@ -28,20 +29,13 @@ * executes each one, and then stores the output * in a PolicyCheckResults object */ +@Getter public class TaskLevelPolicyChecker { - /** - * An enumeration for possible statuses for Data quality checks, - * its values will be PASSED, FAILED, in case if data quality check - * evaluation is not performed for Job, it will be NOT_EVALUATED - */ - public enum DataQualityStatus { - PASSED, - FAILED, - NOT_EVALUATED - } private final List<TaskLevelPolicy> list; private static final Logger LOG = LoggerFactory.getLogger(TaskLevelPolicyChecker.class); + public static final String TASK_LEVEL_POLICY_RESULT_KEY = "gobblin.task.level.policy.result"; Review Comment: if there are multiple policies and with as type `optional` & other as `fail`, then the failure of `fail` policy would be masked and would cause silent success even when the required policy is failing `results.getPolicyResults().put(result, p.getType());` eg. if two policies return below: ('FAILURE' -> 'FAIL') (FAILURE' -> 'OPTIONAL') eventually we will see (FAILURE' -> 'OPTIONAL') and not consider this as DQ failure ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java: ########## @@ -90,6 +94,14 @@ public Void call() metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class); finalizeDatasetStateBeforeCommit(this.datasetState); + // evaluate data quality at the dataset commit level, only when commit source is CommitActivityImpl + if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){ + log.info("Evaluating data quality for commit activity for dataset {}.", this.datasetUrn); + evaluateAndEmitDatasetQuality(); + } else { + log.warn("Skipping data quality evaluation for dataset {} as commit source is {}", this.datasetUrn, + this.datasetCommitSrc); Review Comment: in what all cases, does it go in else? do we want to log this and if that's expected scenario, this can be `info` log ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import java.util.Optional; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if (!bytes.isPresent()) { + return Result.FAILED; + } Review Comment: the `bytes.isPresent() -> bytes.get()` pattern works but can be simplified as below: ``` TransferBytes transferBytes = getBytesReadAndWritten(...).orElse(null); if (transferBytes == null) { return Result.FAILED; } long bytesRead = bytes.getBytesRead(); ... ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java: ########## @@ -90,6 +94,14 @@ public Void call() metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class); finalizeDatasetStateBeforeCommit(this.datasetState); + // evaluate data quality at the dataset commit level, only when commit source is CommitActivityImpl + if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){ Review Comment: `if {` ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.getNonEvaluatedFiles(), datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesCount = 0; + int passedFilesCount = 0; + int nonEvaluatedFilesCount = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesCount++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesCount++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesCount++; + jobDataQualityStatus = DataQualityStatus.FAILED; + } else { + log.warn("Unexpected data quality status: " + taskDataQuality + " for task: " + taskState.getTaskId()); + } + } else { + // Handle files without data quality evaluation + nonEvaluatedFilesCount++; + log.warn("No data quality evaluation for task: " + taskState.getTaskId()); + } + } + + // Log summary of evaluation + log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: {}, Not Evaluated: {}", + totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + } + + + private static void emitMetrics(JobState jobState, final DataQualityStatus jobDataQuality, final int totalFiles, + final int passedFilesCount, final int failedFilesCount, final int nonEvaluatedFilesCount, final String datasetUrn) { + try { + // Check if OpenTelemetry is enabled + boolean otelEnabled = jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED); + + if (!otelEnabled) { + log.info("OpenTelemetry metrics disabled, skipping metrics emission"); + return; + } + + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + log.info("OpenTelemetry instance obtained: {}", otelMetrics != null); + + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState, datasetUrn); + log.info("Tags for data quality metrics: " + tags.toString()); + // Emit data quality status (1 for PASSED, 0 for FAILED) + log.info("Data quality status for this job is " + jobDataQuality); + if (jobDataQuality == DataQualityStatus.PASSED) { + log.info("Data quality passed for job: {}", jobState.getJobName()); + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) Review Comment: we can avoid repeated getMeter(...) calls and assign the meter to a local variable once and reuse it for all metrics: ``` Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME); ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.getNonEvaluatedFiles(), datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesCount = 0; + int passedFilesCount = 0; + int nonEvaluatedFilesCount = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesCount++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesCount++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesCount++; + jobDataQualityStatus = DataQualityStatus.FAILED; + } else { + log.warn("Unexpected data quality status: " + taskDataQuality + " for task: " + taskState.getTaskId()); + } + } else { + // Handle files without data quality evaluation + nonEvaluatedFilesCount++; + log.warn("No data quality evaluation for task: " + taskState.getTaskId()); + } + } + + // Log summary of evaluation + log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: {}, Not Evaluated: {}", + totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + } + + + private static void emitMetrics(JobState jobState, final DataQualityStatus jobDataQuality, final int totalFiles, + final int passedFilesCount, final int failedFilesCount, final int nonEvaluatedFilesCount, final String datasetUrn) { + try { + // Check if OpenTelemetry is enabled + boolean otelEnabled = jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED); + + if (!otelEnabled) { + log.info("OpenTelemetry metrics disabled, skipping metrics emission"); + return; + } + + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + log.info("OpenTelemetry instance obtained: {}", otelMetrics != null); + + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState, datasetUrn); + log.info("Tags for data quality metrics: " + tags.toString()); + // Emit data quality status (1 for PASSED, 0 for FAILED) + log.info("Data quality status for this job is " + jobDataQuality); + if (jobDataQuality == DataQualityStatus.PASSED) { + log.info("Data quality passed for job: {}", jobState.getJobName()); + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT) + .setDescription("Number of jobs that passed data quality") + .build() + .add(1, tags); + } else { + log.info("Data quality failed for job: {}", jobState.getJobName()); + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT) Review Comment: since if/else differs in metric name only, we can consider determining the metric name first and remove this if/else block using below: ``` String jobMetricName = (jobDataQuality == DataQualityStatus.PASSED) ? ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT : ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT; ``` ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import java.util.Optional; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if (!bytes.isPresent()) { + return Result.FAILED; + } + Long bytesRead = bytes.get().getBytesRead(); + Long bytesWritten = bytes.get().getBytesWritten(); + + Long sizeDifference = Math.abs(bytesRead - bytesWritten); + + if (sizeDifference == 0) { + return Result.PASSED; + } + + log.warn("File size check failed - bytes read: {}, bytes written: {}, difference: {}", + bytesRead, bytesWritten, sizeDifference); + return Result.FAILED; + } + + @Override + public String toString() { + Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state); + if(bytes.isPresent()) { + return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", bytes.get().getBytesRead(), bytes.get().getBytesWritten()); + } else{ Review Comment: nit: add space after `else {`.. similarly update at other places. Please refer to https://gobblin.apache.org/docs/developer-guide/CodingStyle/ to import the codestyle xml in IDE which automatically handles this ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.getNonEvaluatedFiles(), datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesCount = 0; + int passedFilesCount = 0; + int nonEvaluatedFilesCount = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesCount++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesCount++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesCount++; + jobDataQualityStatus = DataQualityStatus.FAILED; + } else { + log.warn("Unexpected data quality status: " + taskDataQuality + " for task: " + taskState.getTaskId()); + } + } else { + // Handle files without data quality evaluation + nonEvaluatedFilesCount++; + log.warn("No data quality evaluation for task: " + taskState.getTaskId()); + } + } + + // Log summary of evaluation + log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: {}, Not Evaluated: {}", + totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + } + + + private static void emitMetrics(JobState jobState, final DataQualityStatus jobDataQuality, final int totalFiles, + final int passedFilesCount, final int failedFilesCount, final int nonEvaluatedFilesCount, final String datasetUrn) { + try { + // Check if OpenTelemetry is enabled + boolean otelEnabled = jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED); + + if (!otelEnabled) { + log.info("OpenTelemetry metrics disabled, skipping metrics emission"); + return; + } + + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + log.info("OpenTelemetry instance obtained: {}", otelMetrics != null); + + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState, datasetUrn); + log.info("Tags for data quality metrics: " + tags.toString()); + // Emit data quality status (1 for PASSED, 0 for FAILED) Review Comment: we are emitting different metrics now.. this comment can be updated ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.getNonEvaluatedFiles(), datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesCount = 0; + int passedFilesCount = 0; + int nonEvaluatedFilesCount = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesCount++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesCount++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesCount++; + jobDataQualityStatus = DataQualityStatus.FAILED; + } else { + log.warn("Unexpected data quality status: " + taskDataQuality + " for task: " + taskState.getTaskId()); + } + } else { + // Handle files without data quality evaluation + nonEvaluatedFilesCount++; + log.warn("No data quality evaluation for task: " + taskState.getTaskId()); + } + } + + // Log summary of evaluation + log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: {}, Not Evaluated: {}", + totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + } + + + private static void emitMetrics(JobState jobState, final DataQualityStatus jobDataQuality, final int totalFiles, + final int passedFilesCount, final int failedFilesCount, final int nonEvaluatedFilesCount, final String datasetUrn) { + try { + // Check if OpenTelemetry is enabled + boolean otelEnabled = jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED); + + if (!otelEnabled) { + log.info("OpenTelemetry metrics disabled, skipping metrics emission"); + return; + } + + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + log.info("OpenTelemetry instance obtained: {}", otelMetrics != null); + + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState, datasetUrn); + log.info("Tags for data quality metrics: " + tags.toString()); + // Emit data quality status (1 for PASSED, 0 for FAILED) + log.info("Data quality status for this job is " + jobDataQuality); Review Comment: can combine multiple logging statements into one ``` log.info("Emitting DQ metrics for job={}, status={}, tags={}", jobState.getJobName(), jobDataQuality, tags); ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java: ########## @@ -613,6 +629,11 @@ private boolean checkDataQuality(Optional<Object> schema) TaskLevelPolicyCheckResults taskResults = this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState, this.branches > 1 ? this.index : -1) .executePolicies(); + boolean allRequiredPoliciesPassed = taskResults.getPolicyResults().entrySet().stream() + .filter(e -> e.getValue() == TaskLevelPolicy.Type.FAIL) + .allMatch(e -> e.getKey() == TaskLevelPolicy.Result.PASSED); + forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY, + allRequiredPoliciesPassed ? DataQualityStatus.PASSED.name() : DataQualityStatus.FAILED.name()); Review Comment: we want to mark DQ status as failure if any policy fail, right? if yes, shouldn't we check that there is no `Result.FAILURE` key and that `Result.PASSED` is present and based on that, mark DQ status as SUCCESS ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import java.util.List; +import java.util.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.getNonEvaluatedFiles(), datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesCount = 0; + int passedFilesCount = 0; + int nonEvaluatedFilesCount = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesCount++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesCount++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesCount++; + jobDataQualityStatus = DataQualityStatus.FAILED; + } else { + log.warn("Unexpected data quality status: " + taskDataQuality + " for task: " + taskState.getTaskId()); + } + } else { + // Handle files without data quality evaluation + nonEvaluatedFilesCount++; + log.warn("No data quality evaluation for task: " + taskState.getTaskId()); + } + } + + // Log summary of evaluation + log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: {}, Not Evaluated: {}", + totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount); + } + + + private static void emitMetrics(JobState jobState, final DataQualityStatus jobDataQuality, final int totalFiles, + final int passedFilesCount, final int failedFilesCount, final int nonEvaluatedFilesCount, final String datasetUrn) { + try { + // Check if OpenTelemetry is enabled + boolean otelEnabled = jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED); + + if (!otelEnabled) { + log.info("OpenTelemetry metrics disabled, skipping metrics emission"); + return; + } + + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + log.info("OpenTelemetry instance obtained: {}", otelMetrics != null); + + if (otelMetrics != null) { Review Comment: we can check for `if (otelMetrics == null)` and return early. It also simplifies the rest of the method by reducing indentation and nesting, making the main logic easier to follow. Issue Time Tracking ------------------- Worklog Id: (was: 977040) Time Spent: 1h 40m (was: 1.5h) > FileSize Data Quality implementation for FileBasedCopy > ------------------------------------------------------ > > Key: GOBBLIN-2204 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2204 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vaibhav Singhal > Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)