[ https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=975566&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975566 ]
ASF GitHub Bot logged work on GOBBLIN-2204: ------------------------------------------- Author: ASF GitHub Bot Created on: 22/Jul/25 03:32 Start Date: 22/Jul/25 03:32 Worklog Time Spent: 10m Work Description: vsinghal85 commented on code in PR #4113: URL: https://github.com/apache/gobblin/pull/4113#discussion_r2220945507 ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), Review Comment: Valid suggestion, I was trying to follow similar convention here as for jobSucceeded, where we have 1 for success and 0 for failure, but agree this should help us have better tracking. Issue Time Tracking ------------------- Worklog Id: (was: 975566) Time Spent: 1.5h (was: 1h 20m) > FileSize Data Quality implementation for FileBasedCopy > ------------------------------------------------------ > > Key: GOBBLIN-2204 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2204 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vaibhav Singhal > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)