khandelwal-prateek commented on code in PR #4113: URL: https://github.com/apache/gobblin/pull/4113#discussion_r2094602080
########## gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java: ########## @@ -40,19 +41,29 @@ public enum DataQualityStatus { NOT_EVALUATED } private final List<TaskLevelPolicy> list; + private final State state; private static final Logger LOG = LoggerFactory.getLogger(TaskLevelPolicyChecker.class); - public TaskLevelPolicyChecker(List<TaskLevelPolicy> list) { + public static final String TASK_LEVEL_POLICY_RESULT_KEY = "gobblin.task.level.policy.result"; + + public TaskLevelPolicyChecker(List<TaskLevelPolicy> list, State state) { this.list = list; + this.state = state; } public TaskLevelPolicyCheckResults executePolicies() { TaskLevelPolicyCheckResults results = new TaskLevelPolicyCheckResults(); + boolean allRequiredPoliciesPassed = true; for (TaskLevelPolicy p : this.list) { TaskLevelPolicy.Result result = p.executePolicy(); results.getPolicyResults().put(result, p.getType()); + if(TaskLevelPolicy.Type.FAIL.equals(p.getType()) && TaskLevelPolicy.Result.FAILED.name().equals(result.name())){ Review Comment: nit: add space after if - `if (` and before { - ` {` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java: ########## @@ -48,9 +48,12 @@ public boolean isEmpty() { return throwables.isEmpty(); } - public ForkException getAggregatedException (List<Integer> failedForkIds, String taskId) { + public ForkException getAggregatedException (List<Integer> failedForkIds, String taskId, String taskDataQuality) { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("Fork branches " + failedForkIds + " failed for task " + taskId + "\n"); + if(taskDataQuality!=null && !"PASSED".equals(taskDataQuality)){ Review Comment: let's avoid using hardcoded strings like "PASSED".. please create a constant or use an enum for data quality status ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java: ########## @@ -90,7 +91,8 @@ public void onTaskRunCompletion(Task task) { // Check the task state and handle task retry if task failed and // it has not reached the maximum number of retries WorkUnitState.WorkingState state = task.getTaskState().getWorkingState(); - if (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < this.maxTaskRetries) { + String dataQualityResult = task.getTaskState().getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + if (TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult) || state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < this.maxTaskRetries) { Review Comment: I think we need to retry in case of either data quality failure or task failure till the retry count.. however since `&&` has higher precedence than `||`, the condition this would be interpreted as `if ((DataQualityStatus == FAILED) || (state == FAILED && retryCount < max))` and it would be stuck in infinite retry loop if DataQualityStatus failed. Please add parenthesis to run the intended logic ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.writer; + +import java.io.IOException; +import java.io.InputStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.policies.size.FileSizePolicy; +import org.apache.gobblin.writer.DataWriter; +import org.apache.hadoop.fs.Path; + +/** + * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to intentionally report incorrect file sizes. + * This is useful for testing data quality checks that verify file sizes. + * + * The writer actually writes the correct data to the destination, but reports incorrect sizes in the bytesWritten() method. + * The size discrepancy can be configured through properties: + * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by (default 1.0) Review Comment: the default value used is `0.9` actually ########## gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java: ########## @@ -40,19 +41,29 @@ public enum DataQualityStatus { NOT_EVALUATED } private final List<TaskLevelPolicy> list; + private final State state; private static final Logger LOG = LoggerFactory.getLogger(TaskLevelPolicyChecker.class); - public TaskLevelPolicyChecker(List<TaskLevelPolicy> list) { + public static final String TASK_LEVEL_POLICY_RESULT_KEY = "gobblin.task.level.policy.result"; + + public TaskLevelPolicyChecker(List<TaskLevelPolicy> list, State state) { this.list = list; + this.state = state; } public TaskLevelPolicyCheckResults executePolicies() { TaskLevelPolicyCheckResults results = new TaskLevelPolicyCheckResults(); + boolean allRequiredPoliciesPassed = true; for (TaskLevelPolicy p : this.list) { TaskLevelPolicy.Result result = p.executePolicy(); results.getPolicyResults().put(result, p.getType()); + if(TaskLevelPolicy.Type.FAIL.equals(p.getType()) && TaskLevelPolicy.Result.FAILED.name().equals(result.name())){ + allRequiredPoliciesPassed = false; + } LOG.info("TaskLevelPolicy " + p + " of type " + p.getType() + " executed with result " + result); } + state.setProp(TASK_LEVEL_POLICY_RESULT_KEY, + allRequiredPoliciesPassed ? DataQualityStatus.PASSED.name() : DataQualityStatus.FAILED.name()); Review Comment: `TaskLevelPolicyChecker` is a generic policy executor.. this change tightly couples it to data quality evaluation by writing DataQualityStatus into a `State` variable. Please move this logic to a higher level component(i.e. caller), so that this class is responsible for policy execution only. ########## gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class FileSizePolicyTest { + + @Test + public void testPolicyPass() { + State state = new State(); + state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L); + state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L); + + FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertTrue(policy.executePolicy().equals(TaskLevelPolicy.Result.PASSED)); Review Comment: use `Assert.assertEquals` ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +public class FileSizePolicy extends TaskLevelPolicy { + private static final Logger LOG = LoggerFactory.getLogger(FileSizePolicy.class); + + public static final String BYTES_READ_KEY = "gobblin.copy.bytesRead"; + public static final String BYTES_WRITTEN_KEY = "gobblin.copy.bytesWritten"; + + private final long bytesRead; + private final long bytesWritten; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + this.bytesRead = state.getPropAsLong(BYTES_READ_KEY, 0); + this.bytesWritten = state.getPropAsLong(BYTES_WRITTEN_KEY, 0); + } + + @Override + public Result executePolicy() { + double sizeDifference = Math.abs(this.bytesRead - this.bytesWritten); + + if (sizeDifference == 0) { + return Result.PASSED; + } + + LOG.warn("File size check failed - bytes read: {}, bytes written: {}, difference: {}", + this.bytesRead, this.bytesWritten, sizeDifference); + return Result.FAILED; + } + + @Override + public String toString() { + return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]", + this.bytesRead, this.bytesWritten); + } + +} Review Comment: nit: add newline at end of file in all new files ########## gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; + +/** + * A task-level policy that checks if the bytes read matches the bytes written for a file copy operation. + */ +public class FileSizePolicy extends TaskLevelPolicy { + private static final Logger LOG = LoggerFactory.getLogger(FileSizePolicy.class); + + public static final String BYTES_READ_KEY = "gobblin.copy.bytesRead"; + public static final String BYTES_WRITTEN_KEY = "gobblin.copy.bytesWritten"; + + private final long bytesRead; + private final long bytesWritten; + + public FileSizePolicy(State state, TaskLevelPolicy.Type type) { + super(state, type); + this.bytesRead = state.getPropAsLong(BYTES_READ_KEY, 0); + this.bytesWritten = state.getPropAsLong(BYTES_WRITTEN_KEY, 0); Review Comment: if props are missing, setting default to 0 might hide actual issue and cause false success/failures. Is there a case where we expect the default value to be used? If no, let's validate that both props are explicitly set ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java: ########## @@ -308,6 +312,8 @@ public boolean apply(FileStatus input) { os.close(); log.info("OutputStream for file {} is closed.", writeAt); inputStream.close(); + long actualFileSize = this.fs.getFileStatus(writeAt).getLen(); + this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, actualFileSize); Review Comment: `fs.getFileStatus` can throw an IOException.. it would be better to move this to a helper method, eg. `recordBytesWritten` which also can also logs the exception ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +798,60 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreQualityStatus(JobState jobState) { + TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED; + + for (TaskState taskState : getTaskStates()) { + String qualityResult = taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + log.info("Data quality status of this task is: " + qualityResult); + if (qualityResult != null && !TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) { + log.info("Data quality not passed: " + qualityResult); Review Comment: `log.warn` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.writer; + +import java.io.IOException; +import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.DataWriterBuilder; +import org.apache.hadoop.fs.Path; + +/** + * Builder for {@link IncorrectSizeFileAwareInputStreamDataWriter}. + */ +public class IncorrectSizeFileAwareInputStreamDataWriterBuilder extends DataWriterBuilder<String, FileAwareInputStream> { + + @Override + public final DataWriter<FileAwareInputStream> build() throws IOException { + setJobSpecificOutputPaths(this.destination.getProperties()); + this.destination.getProperties().setProp(ConfigurationKeys.WRITER_FILE_PATH, this.writerId); + return buildWriter(); + } + + protected DataWriter<FileAwareInputStream> buildWriter() throws IOException { + return new IncorrectSizeFileAwareInputStreamDataWriter(this.destination.getProperties(), this.branches, this.branch, this.writerAttemptId); + } + + /** + * Each job gets its own task-staging and task-output directory. Update the staging and output directories to + * contain job_id. This is to make sure uncleaned data from previous execution does not corrupt final published data + * produced by this execution. + */ + public synchronized static void setJobSpecificOutputPaths(State state) { + if (!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR), + state.getProp(ConfigurationKeys.JOB_ID_KEY))) { + + state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR), + state.getProp(ConfigurationKeys.JOB_ID_KEY))); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), + state.getProp(ConfigurationKeys.JOB_ID_KEY))); + } + } +} Review Comment: add `end of line` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +798,60 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreQualityStatus(JobState jobState) { + TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED; + + for (TaskState taskState : getTaskStates()) { + String qualityResult = taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + log.info("Data quality status of this task is: " + qualityResult); + if (qualityResult != null && !TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) { + log.info("Data quality not passed: " + qualityResult); + jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.FAILED; + break; Review Comment: instead of stopping the loop on any failure, it would be better to iterate over all tasks and add a log with aggregate count that DQ failed for `M` out of `N` tasks ########## gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.policies.size; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class FileSizePolicyTest { + + @Test + public void testPolicyPass() { + State state = new State(); + state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L); + state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L); + + FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertTrue(policy.executePolicy().equals(TaskLevelPolicy.Result.PASSED)); + } + + @Test + public void testPolicyFail() { + State state = new State(); + state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L); + state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L); + + FileSizePolicy policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL); + Assert.assertTrue(policy.executePolicy().equals(TaskLevelPolicy.Result.FAILED)); + } + Review Comment: add a test case if property is missing or if only one property is set ########## gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java: ########## @@ -207,4 +208,65 @@ public void testToJobExecutionInfo() { Collections.sort(taskStateIds); Assert.assertEquals(taskStateIds, Lists.newArrayList("TestTask-0", "TestTask-1", "TestTask-2")); } + + @Test + public void testDataQualityStatus() { + // Create a new job state for testing data quality + JobState.DatasetState datasetState = new JobState.DatasetState("DataQualityTestJob", "DataQualityTestJob-1"); + + // Create task states with different data quality results + for (int i = 0; i < 3; i++) { + WorkUnit workUnit = WorkUnit.createEmpty(); + WorkUnitState workUnitState = new WorkUnitState(workUnit); + workUnitState.setProp(ConfigurationKeys.JOB_ID_KEY, "DataQualityTestJob-1"); + workUnitState.setProp(ConfigurationKeys.TASK_ID_KEY, "DataQualityTask-" + i); + workUnitState.setProp(ConfigurationKeys.DATASET_URN_KEY, "TestDataset"); + + TaskState taskState = new TaskState(workUnitState); + taskState.setTaskId("DataQualityTask-" + i); + taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL); + + // Set different data quality results for each task + switch (i) { + case 0: + // First task passes data quality + taskState.setProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY, "PASSED"); Review Comment: use enum/constant ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +798,60 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreQualityStatus(JobState jobState) { + TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED; + + for (TaskState taskState : getTaskStates()) { + String qualityResult = taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + log.info("Data quality status of this task is: " + qualityResult); + if (qualityResult != null && !TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) { Review Comment: it would be better to get the `qualityResult` in an enum and then compare it with `DataQualityStatus.PASSED` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java: ########## @@ -316,6 +317,23 @@ private void completeShutdown() { this.shutdownLatch.countDown(); } + private void computeAndUpdateTaskDataQuality() { + String overallTaskDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED.name(); + for (Optional<Fork> fork : this.forks.keySet()) { + if (fork.isPresent()) { + TaskState forkTaskState = fork.get().getTaskState(); + if (forkTaskState != null) { + String forkDataQualityResult = forkTaskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); Review Comment: get the result in an enum and then use it to compare ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.writer; + +import java.io.IOException; +import java.io.InputStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.policies.size.FileSizePolicy; +import org.apache.gobblin.writer.DataWriter; +import org.apache.hadoop.fs.Path; + +/** + * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to intentionally report incorrect file sizes. + * This is useful for testing data quality checks that verify file sizes. + * Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org