khandelwal-prateek commented on code in PR #4113: URL: https://github.com/apache/gobblin/pull/4113#discussion_r2135036625
########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + private final Long bytesRead; + private final Long bytesWritten; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + String bytesReadString = state.getProp(BYTES_READ_KEY); + String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY); + this.bytesRead = bytesReadString == null ? null : Long.parseLong(bytesReadString); + this.bytesWritten = bytesWrittenString == null ? null : Long.parseLong(bytesWrittenString); + } + + @Override + public Result executePolicy() { + if(this.bytesRead == null || this.bytesWritten == null) { + log.error("No bytes read or bytes written for this request"); Review Comment: it would be useful to log both values to pinpoint which one(in case only one) was missing ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + private final Long bytesRead; + private final Long bytesWritten; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + String bytesReadString = state.getProp(BYTES_READ_KEY); + String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY); + this.bytesRead = bytesReadString == null ? null : Long.parseLong(bytesReadString); Review Comment: `Long.parseLong` doesn't guard against non-numeric values and can throw `NumberFormatException`. We can move this to `executoPolicy`, as it avoids instantiation failure due to NumberFormatException and improves error locality ########## gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.qualitychecker; + +/** + * An enumeration for possible statuses for Data quality checks. + * Its values will be: + * - PASSED: When all data quality checks pass + * - FAILED: When any data quality check fails + * - NOT_EVALUATED: When data quality check evaluation is not performed + */ +public enum DataQualityStatus { + PASSED, + FAILED, + NOT_EVALUATED, + UNKNOWN Review Comment: how/when are we using this status? ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +@Slf4j +public class FileSizePolicy extends TaskLevelPolicy { + + public static final String COPY_PREFIX = "gobblin.copy"; + public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead"; + public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten"; + + private final Long bytesRead; + private final Long bytesWritten; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + String bytesReadString = state.getProp(BYTES_READ_KEY); + String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY); + this.bytesRead = bytesReadString == null ? null : Long.parseLong(bytesReadString); + this.bytesWritten = bytesWrittenString == null ? null : Long.parseLong(bytesWrittenString); + } + + @Override + public Result executePolicy() { + if(this.bytesRead == null || this.bytesWritten == null) { + log.error("No bytes read or bytes written for this request"); + return Result.FAILED; + } + double sizeDifference = Math.abs(this.bytesRead - this.bytesWritten); Review Comment: Please use long instead of double for `sizeDifference`, since both bytesRead and bytesWritten are long values ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { Review Comment: as we can are emitting metrics, can rename to use `emit/report` in place of `store` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + for (TaskState taskState : getTaskStates()) { + totalFiles++; + DataQualityStatus qualityResult = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + if (result != null) { + try { + qualityResult = DataQualityStatus.valueOf(result); + } catch (IllegalArgumentException e) { + log.warn("Unknown data quality status encountered " + result); + qualityResult = DataQualityStatus.UNKNOWN; + } + } + log.info("Data quality status of this task is: " + qualityResult); + if (DataQualityStatus.PASSED != qualityResult) { + failedFilesSize++; + log.warn("Data quality not passed: " + qualityResult); + jobDataQuality = DataQualityStatus.FAILED; + } + else { + passedFilesSize++; + } + } + + super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, jobDataQuality.name()); + + // Emit OTEL metrics for data quality + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState); + // Emit data quality status (1 for PASSED, 0 for FAILED) + DataQualityStatus finalJobDataQuality = jobDataQuality; Review Comment: please use `final` since this should not change ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + for (TaskState taskState : getTaskStates()) { + totalFiles++; + DataQualityStatus qualityResult = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + if (result != null) { + try { + qualityResult = DataQualityStatus.valueOf(result); + } catch (IllegalArgumentException e) { + log.warn("Unknown data quality status encountered " + result); + qualityResult = DataQualityStatus.UNKNOWN; + } + } + log.info("Data quality status of this task is: " + qualityResult); + if (DataQualityStatus.PASSED != qualityResult) { + failedFilesSize++; + log.warn("Data quality not passed: " + qualityResult); + jobDataQuality = DataQualityStatus.FAILED; + } + else { + passedFilesSize++; + } + } + + super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, jobDataQuality.name()); + + // Emit OTEL metrics for data quality + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState); + // Emit data quality status (1 for PASSED, 0 for FAILED) + DataQualityStatus finalJobDataQuality = jobDataQuality; + log.info("Data quality status for this job is " + finalJobDataQuality); + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME) + .ofLongs() + .buildWithCallback(measurement -> { + log.info("Emitting metric for data quality"); + measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 : 0, tags); + }); + + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT) + .build() + .add(totalFiles, tags); + // Emit passed files count + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT) + .build().add(passedFilesSize, tags); + // Emit failed files count + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT) + .build().add(failedFilesSize, tags); + } + } + + private Attributes getTagsForDataQualityMetrics(JobState jobState) { + Properties jobProperties = new Properties(); + try { + jobProperties = PropertiesUtils.deserialize(jobState.getProp("job.props", "")); + log.info("Job properties loaded: " + jobProperties); + } catch (IOException e) { + log.error("Could not deserialize job properties", e); + } Review Comment: consider logging a clearer warning or adding a fallback tag/metric to indicate that job.props deserialization failed. Without this, OTEL metrics would silently lack important context like source/destination, making debugging or metric analysis harder. ########## gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class FileSizePolicyTest { + + @Test + public void testPolicyPass() { + State state = new State(); + state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L); + state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L); + + FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.PASSED); + } + + @Test + public void testPolicyFail() { + State state = new State(); + state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L); + state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L); + + FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED); + } + + @Test + public void testMissingProperties() { + State state = new State(); + // No properties set at all + FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED); + } + + @Test + public void testPartiallySetProperties() { + State state = new State(); + // Only set bytes read, not bytes written + state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L); + + FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED); + + // Reset state and only set bytes written, not bytes read + state = new State(); + state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L); + + policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED); + } + +} Review Comment: nit: add a newline at end of file ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.writer; + +import java.io.IOException; +import java.io.InputStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.Path; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.policies.size.FileSizePolicy; +import org.apache.gobblin.writer.DataWriter; + +/** + * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to intentionally report incorrect file sizes. + * This is useful for testing data quality checks that verify file sizes. + * + * The writer actually writes the correct data to the destination, but reports incorrect sizes in the bytesWritten() method. + * The size discrepancy can be configured through properties: + * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by (default 1.0) + * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size (default 0) + */ +@Slf4j +public class IncorrectSizeFileAwareInputStreamDataWriter extends FileAwareInputStreamDataWriter { + + public static final String INCORRECT_SIZE_RATIO_KEY = CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio"; + public static final String INCORRECT_SIZE_OFFSET_KEY = CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset"; + public static final double DEFAULT_INCORRECT_SIZE_RATIO = 1.0; Review Comment: should we use a default value as some incorrect ratio like `0.9`? Else we can clarify in Javadoc that default config does not introduce discrepancy unless overridden ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + for (TaskState taskState : getTaskStates()) { + totalFiles++; + DataQualityStatus qualityResult = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + if (result != null) { + try { + qualityResult = DataQualityStatus.valueOf(result); + } catch (IllegalArgumentException e) { + log.warn("Unknown data quality status encountered " + result); + qualityResult = DataQualityStatus.UNKNOWN; Review Comment: consider adding a fromString() static method in DataQualityStatus to centralize the parsing and fallback logic. This will simplify code and ensure consistent handling of unknown/malformed values. ``` public static DataQualityStatus fromString(String value) { if (value == null) { return UNKNOWN; } try { return DataQualityStatus.valueOf(value.toUpperCase()); } catch (IllegalArgumentException e) { log.error(...); return UNKNOWN; } } ``` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + for (TaskState taskState : getTaskStates()) { + totalFiles++; + DataQualityStatus qualityResult = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + if (result != null) { + try { + qualityResult = DataQualityStatus.valueOf(result); + } catch (IllegalArgumentException e) { + log.warn("Unknown data quality status encountered " + result); + qualityResult = DataQualityStatus.UNKNOWN; + } + } + log.info("Data quality status of this task is: " + qualityResult); + if (DataQualityStatus.PASSED != qualityResult) { + failedFilesSize++; + log.warn("Data quality not passed: " + qualityResult); + jobDataQuality = DataQualityStatus.FAILED; + } + else { + passedFilesSize++; + } + } + + super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, jobDataQuality.name()); + + // Emit OTEL metrics for data quality + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState); + // Emit data quality status (1 for PASSED, 0 for FAILED) + DataQualityStatus finalJobDataQuality = jobDataQuality; + log.info("Data quality status for this job is " + finalJobDataQuality); + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME) + .ofLongs() + .buildWithCallback(measurement -> { + log.info("Emitting metric for data quality"); + measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 : 0, tags); + }); + + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT) + .build() + .add(totalFiles, tags); + // Emit passed files count + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT) + .build().add(passedFilesSize, tags); + // Emit failed files count + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT) + .build().add(failedFilesSize, tags); + } + } + + private Attributes getTagsForDataQualityMetrics(JobState jobState) { + Properties jobProperties = new Properties(); + try { + jobProperties = PropertiesUtils.deserialize(jobState.getProp("job.props", "")); + log.info("Job properties loaded: " + jobProperties); + } catch (IOException e) { + log.error("Could not deserialize job properties", e); + } + + return Attributes.builder() + .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobState.getJobName()) + .put(TimingEvent.DATASET_URN, this.getDatasetUrn()) + .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD)) + .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD)) + .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)) + .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD)) + .put(TimingEvent.FlowEventConstants.FLOW_FABRIC ,jobState.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, null)) + .put(TimingEvent.FlowEventConstants.FLOW_SOURCE ,jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "")) + .put(TimingEvent.FlowEventConstants.FLOW_DESTINATION, jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "")) + .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, "")) + .build(); + } + + /** + * Gets the overall data quality status of the dataset. + * @return "PASSED" if all tasks passed their quality checks, "FAILED" otherwise + */ + public String getDataQualityStatus() { Review Comment: can we return enum from here to avoid string logic downstream.. we can use `.name()` from calling methods, if it requires the string representation ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + for (TaskState taskState : getTaskStates()) { + totalFiles++; + DataQualityStatus qualityResult = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + if (result != null) { + try { + qualityResult = DataQualityStatus.valueOf(result); + } catch (IllegalArgumentException e) { + log.warn("Unknown data quality status encountered " + result); + qualityResult = DataQualityStatus.UNKNOWN; + } + } + log.info("Data quality status of this task is: " + qualityResult); Review Comment: this would log for each task.. can we use `log.debug` instead ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.writer; + +import java.io.IOException; +import java.io.InputStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.Path; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.policies.size.FileSizePolicy; +import org.apache.gobblin.writer.DataWriter; + +/** + * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to intentionally report incorrect file sizes. + * This is useful for testing data quality checks that verify file sizes. + * + * The writer actually writes the correct data to the destination, but reports incorrect sizes in the bytesWritten() method. + * The size discrepancy can be configured through properties: + * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by (default 1.0) + * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size (default 0) + */ +@Slf4j +public class IncorrectSizeFileAwareInputStreamDataWriter extends FileAwareInputStreamDataWriter { + + public static final String INCORRECT_SIZE_RATIO_KEY = CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio"; + public static final String INCORRECT_SIZE_OFFSET_KEY = CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset"; + public static final double DEFAULT_INCORRECT_SIZE_RATIO = 1.0; + public static final long DEFAULT_INCORRECT_SIZE_OFFSET = 0L; + + private final double sizeRatio; + private final long sizeOffset; + + public IncorrectSizeFileAwareInputStreamDataWriter(State state, int numBranches, int branchId) + throws IOException { + this(state, numBranches, branchId, null); + } + + public IncorrectSizeFileAwareInputStreamDataWriter(State state, int numBranches, int branchId, String writerAttemptId) + throws IOException { + super(state, numBranches, branchId, writerAttemptId); + this.sizeRatio = state.getPropAsDouble(INCORRECT_SIZE_RATIO_KEY, DEFAULT_INCORRECT_SIZE_RATIO); + this.sizeOffset = state.getPropAsLong(INCORRECT_SIZE_OFFSET_KEY, DEFAULT_INCORRECT_SIZE_OFFSET); Review Comment: should we check for negative values here? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + for (TaskState taskState : getTaskStates()) { + totalFiles++; + DataQualityStatus qualityResult = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + if (result != null) { + try { + qualityResult = DataQualityStatus.valueOf(result); + } catch (IllegalArgumentException e) { + log.warn("Unknown data quality status encountered " + result); + qualityResult = DataQualityStatus.UNKNOWN; + } + } + log.info("Data quality status of this task is: " + qualityResult); + if (DataQualityStatus.PASSED != qualityResult) { + failedFilesSize++; + log.warn("Data quality not passed: " + qualityResult); Review Comment: please add a task identifier for failed DQ task in the log ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { + DataQualityStatus jobDataQuality = DataQualityStatus.PASSED; + int totalFiles = 0; + int failedFilesSize = 0; + int passedFilesSize = 0; + for (TaskState taskState : getTaskStates()) { + totalFiles++; + DataQualityStatus qualityResult = null; + String result = taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY); + if (result != null) { + try { + qualityResult = DataQualityStatus.valueOf(result); + } catch (IllegalArgumentException e) { + log.warn("Unknown data quality status encountered " + result); + qualityResult = DataQualityStatus.UNKNOWN; + } + } + log.info("Data quality status of this task is: " + qualityResult); + if (DataQualityStatus.PASSED != qualityResult) { + failedFilesSize++; + log.warn("Data quality not passed: " + qualityResult); + jobDataQuality = DataQualityStatus.FAILED; + } + else { + passedFilesSize++; + } + } + + super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, jobDataQuality.name()); + + // Emit OTEL metrics for data quality + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + if (otelMetrics != null) { + Attributes tags = getTagsForDataQualityMetrics(jobState); + // Emit data quality status (1 for PASSED, 0 for FAILED) + DataQualityStatus finalJobDataQuality = jobDataQuality; + log.info("Data quality status for this job is " + finalJobDataQuality); + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME) + .ofLongs() + .buildWithCallback(measurement -> { + log.info("Emitting metric for data quality"); + measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 : 0, tags); + }); + + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT) + .build() + .add(totalFiles, tags); + // Emit passed files count + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT) + .build().add(passedFilesSize, tags); + // Emit failed files count + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT) + .build().add(failedFilesSize, tags); Review Comment: we can combine the three counter emissions into a single `.batchCallback()` to improve efficiency and avoid duplication ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +800,108 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreDatasetQualityStatus(JobState jobState) { Review Comment: this method introduces metrics emission and data quality evaluation logic directly into JobState, which is primarily a data container for job/task state. To improve separation of concerns, please move this logic into a higher-level component e.g. a DataQualityEvaluator/JobResultHandler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org