[ https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=975539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975539 ]
ASF GitHub Bot logged work on GOBBLIN-2204: ------------------------------------------- Author: ASF GitHub Bot Created on: 21/Jul/25 20:57 Start Date: 21/Jul/25 20:57 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4113: URL: https://github.com/apache/gobblin/pull/4113#discussion_r2220106656 ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + TransferBytes bytes = getBytesReadAndWritten(this.state); + if (bytes == null) { + return Result.FAILED; + } + Long bytesRead = bytes.getBytesRead(); + Long bytesWritten = bytes.getBytesWritten(); + + if(bytesRead == null || bytesWritten == null) { Review Comment: nit: add space after `if` ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + TransferBytes bytes = getBytesReadAndWritten(this.state); + if (bytes == null) { + return Result.FAILED; + } + Long bytesRead = bytes.getBytesRead(); + Long bytesWritten = bytes.getBytesWritten(); + + if(bytesRead == null || bytesWritten == null) { + log.error("Missing value(s): bytesRead={}, bytesWritten={}", bytesRead, bytesWritten); + return Result.FAILED; + } + Long sizeDifference = Math.abs(bytesRead - bytesWritten); + + if (sizeDifference == 0) { + return Result.PASSED; + } + + log.warn("File size check failed - bytes read: {}, bytes written: {}, difference: {}", + bytesRead, bytesWritten, sizeDifference); + return Result.FAILED; + } + + @Override + public String toString() { + TransferBytes bytes = getBytesReadAndWritten(this.state); + return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", bytes.getBytesRead(), bytes.getBytesWritten()); + } + + /** + * Helper class to hold transfer bytes information + */ + @Getter + private static class TransferBytes { + final Long bytesRead; + final Long bytesWritten; + TransferBytes(Long bytesRead, Long bytesWritten) { + this.bytesRead = bytesRead; + this.bytesWritten = bytesWritten; + } + } + + /** + * Extracts bytesRead and bytesWritten from the given state. + * Returns null if parsing fails. + */ + private TransferBytes getBytesReadAndWritten(State state) { Review Comment: We can return `Optional<TransferBytes>` from this instead of relying on null or constructing a partially invalid object. Optional clearly expresses the possibility of absence and avoids null-filled objects like `TransferBytes(null, null)`, making the failure path more explicit for the caller ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.nonEvaluatedFiles, datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; Review Comment: rename `faliedFilesCount -> failedFilesCount` to better reflect that these are counts and match naming in metrics emission ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.nonEvaluatedFiles, datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + int nonEvaluatedFilesSize = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesSize++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesSize++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesSize++; + jobDataQuality = DataQualityStatus.FAILED; + } + } else { + // Handle files without data quality evaluation + nonEvaluatedFilesSize++; + log.warn("No data quality evaluation for task: " + taskState.getTaskId()); + } + } + + // Log summary of evaluation + log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed: {}, Not Evaluated: {}", + totalFiles, passedFilesSize, failedFilesSize, nonEvaluatedFilesSize); + return new DataQualityEvaluationResult(jobDataQuality, totalFiles, passedFilesSize, failedFilesSize, nonEvaluatedFilesSize); + } + + private static void emitMetrics(JobState jobState, int jobDataQuality, int totalFiles, + int passedFilesSize, int failedFilesSize, int nonEvaluatedFilesSize, String datasetUrn) { + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + if(otelMetrics != null) { + Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME); + AtomicLong jobDataQualityRef = new AtomicLong(jobDataQuality); + AtomicLong totalFilesRef = new AtomicLong(totalFiles); + AtomicLong passedFilesRef = new AtomicLong(passedFilesSize); + AtomicLong failedFilesRef = new AtomicLong(failedFilesSize); + AtomicLong nonEvaluatedFilesRef = new AtomicLong(nonEvaluatedFilesSize); Review Comment: Any reason to use `AtomicLong`? It should not be needed since the metric values are immutable snapshots and not being mutated or shared across threads ########## gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.qualitychecker; + +import lombok.extern.slf4j.Slf4j; + + +/** + * An enumeration for possible statuses for Data quality checks. + * Its values will be: + * - PASSED: When all data quality checks pass + * - FAILED: When any data quality check fails + * - NOT_EVALUATED: When data quality check evaluation is not performed Review Comment: add javadoc for `UNKNOWN` ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), Review Comment: `DATA_QUALITY_STATUS_METRIC_NAME` is defined as a counter, but adding 0 for failures doesn’t emit anything, so we lose visibility into failed flows completely. To fix this, please split into two explicit counters: one for success and one for failure to ensure both outcomes are observable and can be used to setup alerts. ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.nonEvaluatedFiles, datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + int nonEvaluatedFilesSize = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesSize++; Review Comment: when is `taskStatus: null`.. should that be counted as DQ failure status? ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.nonEvaluatedFiles, datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + int nonEvaluatedFilesSize = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesSize++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesSize++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesSize++; + jobDataQuality = DataQualityStatus.FAILED; + } + } else { Review Comment: in what all cases would taskDataQuality be `NOT_EVALUATED` ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.nonEvaluatedFiles, datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + int nonEvaluatedFilesSize = 0; + + for (TaskState taskState : taskStates) { + totalFiles++; + + // Handle null task states gracefully + if (taskState == null) { + log.warn("Encountered null task state, skipping data quality evaluation for this task"); + nonEvaluatedFilesSize++; + continue; + } + + DataQualityStatus taskDataQuality = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + taskDataQuality = DataQualityStatus.fromString(result); + if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) { + log.debug("Data quality status of this task is: " + taskDataQuality); + if (DataQualityStatus.PASSED == taskDataQuality) { + passedFilesSize++; + } else if (DataQualityStatus.FAILED == taskDataQuality){ + failedFilesSize++; + jobDataQuality = DataQualityStatus.FAILED; + } Review Comment: add a warn log when DataQualityStatus.fromString() returns UNKNOWN ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + } + + @Override + public Result executePolicy() { + TransferBytes bytes = getBytesReadAndWritten(this.state); + if (bytes == null) { + return Result.FAILED; + } + Long bytesRead = bytes.getBytesRead(); + Long bytesWritten = bytes.getBytesWritten(); + + if(bytesRead == null || bytesWritten == null) { Review Comment: We should avoid `new TransferBytes(null, null)`, as it creates a seemingly valid object that requires downstream null checks. We can instead log any error within `getBytesReadAndWritten` and return an `Optional.empty()` for any failure case. ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), Review Comment: The inline condition (status == PASSED ? 1 : 0) leaks metric encoding logic at the caller. We can move this into the DataQualityStatus enum (eg. toMetricValue()), or encapsulating it inside emitMetrics() ########## gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.quality; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.OpenTelemetryMetrics; +import org.apache.gobblin.metrics.OpenTelemetryMetricsBase; +import org.apache.gobblin.metrics.ServiceMetricNames; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.qualitychecker.DataQualityStatus; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.service.ServiceConfigKeys; + +/** + * Evaluates data quality for a set of task states and emits relevant metrics. + * This is a stateless utility class. + */ +@Slf4j +public class DataQualityEvaluator { + + private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = "gobblin.gaas.observability"; + + // Private constructor to prevent instantiation + private DataQualityEvaluator() {} + + /** + * Result of a data quality evaluation containing the overall status and metrics. + */ + @Getter + public static class DataQualityEvaluationResult { + private final DataQualityStatus qualityStatus; + private final int totalFiles; + private final int passedFiles; + private final int failedFiles; + // Number of files that were not evaluated for data quality for example files not found or not processed + private final int nonEvaluatedFiles; + + public DataQualityEvaluationResult(DataQualityStatus qualityStatus, int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) { + this.qualityStatus = qualityStatus; + this.totalFiles = totalFiles; + this.passedFiles = passedFiles; + this.failedFiles = failedFiles; + this.nonEvaluatedFiles = nonEvaluatedFiles; + } + } + + /** + * Evaluates the data quality of a dataset state and stores the result. + * This method is specifically designed for dataset-level quality evaluation. + * + * @param datasetState The dataset state to evaluate and update + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState jobState) { + List<TaskState> taskStates = datasetState.getTaskStates(); + DataQualityEvaluationResult result = evaluateDataQuality(taskStates, jobState); + + // Store the result in the dataset state + jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, result.getQualityStatus().name()); + // Emit dataset-specific metrics + emitMetrics(jobState, result.getQualityStatus() == DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(), + result.getPassedFiles(), result.getFailedFiles(), result.nonEvaluatedFiles, datasetState.getDatasetUrn()); + + return result; + } + + /** + * Evaluates the data quality of a set of task states and emits relevant metrics. + * + * @param taskStates List of task states to evaluate + * @param jobState The job state containing additional context + * @return DataQualityEvaluationResult containing the evaluation results + */ + public static DataQualityEvaluationResult evaluateDataQuality(List<TaskState> taskStates, JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; Review Comment: should we rename the variable to `jobDataQualityStatus`? Issue Time Tracking ------------------- Worklog Id: (was: 975539) Time Spent: 1h (was: 50m) > FileSize Data Quality implementation for FileBasedCopy > ------------------------------------------------------ > > Key: GOBBLIN-2204 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2204 > Project: Apache Gobblin > Issue Type: Task > Reporter: Vaibhav Singhal > Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)