khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2094602080


##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java:
##########
@@ -40,19 +41,29 @@ public enum DataQualityStatus {
     NOT_EVALUATED
   }
   private final List<TaskLevelPolicy> list;
+  private final State state;
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
 
-  public TaskLevelPolicyChecker(List<TaskLevelPolicy> list) {
+  public static final String TASK_LEVEL_POLICY_RESULT_KEY = 
"gobblin.task.level.policy.result";
+
+  public TaskLevelPolicyChecker(List<TaskLevelPolicy> list, State state) {
     this.list = list;
+    this.state = state;
   }
 
   public TaskLevelPolicyCheckResults executePolicies() {
     TaskLevelPolicyCheckResults results = new TaskLevelPolicyCheckResults();
+    boolean allRequiredPoliciesPassed = true;
     for (TaskLevelPolicy p : this.list) {
       TaskLevelPolicy.Result result = p.executePolicy();
       results.getPolicyResults().put(result, p.getType());
+      if(TaskLevelPolicy.Type.FAIL.equals(p.getType()) && 
TaskLevelPolicy.Result.FAILED.name().equals(result.name())){

Review Comment:
   nit: add space after if - `if (` and before { - ` {`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java:
##########
@@ -48,9 +48,12 @@ public boolean isEmpty() {
     return throwables.isEmpty();
   }
 
-  public ForkException getAggregatedException (List<Integer> failedForkIds, 
String taskId) {
+  public ForkException getAggregatedException (List<Integer> failedForkIds, 
String taskId, String taskDataQuality) {
     StringBuffer stringBuffer = new StringBuffer();
     stringBuffer.append("Fork branches " + failedForkIds + " failed for task " 
+ taskId + "\n");
+    if(taskDataQuality!=null && !"PASSED".equals(taskDataQuality)){

Review Comment:
   let's avoid using hardcoded strings like "PASSED".. please create a constant 
or use an enum for data quality status



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java:
##########
@@ -90,7 +91,8 @@ public void onTaskRunCompletion(Task task) {
       // Check the task state and handle task retry if task failed and
       // it has not reached the maximum number of retries
       WorkUnitState.WorkingState state = task.getTaskState().getWorkingState();
-      if (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < 
this.maxTaskRetries) {
+      String dataQualityResult = 
task.getTaskState().getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+      if 
(TaskLevelPolicyChecker.DataQualityStatus.FAILED.name().equals(dataQualityResult)
 || state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() < 
this.maxTaskRetries) {

Review Comment:
   I think we need to retry in case of either data quality failure or task 
failure till the retry count.. however since `&&` has higher precedence than 
`||`, the condition this would be interpreted as 
   `if ((DataQualityStatus == FAILED) || (state == FAILED && retryCount < 
max))` and it would be stuck in infinite retry loop if DataQualityStatus 
failed. Please add parenthesis to run the intended logic
   
   



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to 
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *
+ * The writer actually writes the correct data to the destination, but reports 
incorrect sizes in the bytesWritten() method.
+ * The size discrepancy can be configured through properties:
+ * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by 
(default 1.0)

Review Comment:
   the default value used is `0.9` actually



##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java:
##########
@@ -40,19 +41,29 @@ public enum DataQualityStatus {
     NOT_EVALUATED
   }
   private final List<TaskLevelPolicy> list;
+  private final State state;
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
 
-  public TaskLevelPolicyChecker(List<TaskLevelPolicy> list) {
+  public static final String TASK_LEVEL_POLICY_RESULT_KEY = 
"gobblin.task.level.policy.result";
+
+  public TaskLevelPolicyChecker(List<TaskLevelPolicy> list, State state) {
     this.list = list;
+    this.state = state;
   }
 
   public TaskLevelPolicyCheckResults executePolicies() {
     TaskLevelPolicyCheckResults results = new TaskLevelPolicyCheckResults();
+    boolean allRequiredPoliciesPassed = true;
     for (TaskLevelPolicy p : this.list) {
       TaskLevelPolicy.Result result = p.executePolicy();
       results.getPolicyResults().put(result, p.getType());
+      if(TaskLevelPolicy.Type.FAIL.equals(p.getType()) && 
TaskLevelPolicy.Result.FAILED.name().equals(result.name())){
+        allRequiredPoliciesPassed = false;
+      }
       LOG.info("TaskLevelPolicy " + p + " of type " + p.getType() + " executed 
with result " + result);
     }
+    state.setProp(TASK_LEVEL_POLICY_RESULT_KEY,
+        allRequiredPoliciesPassed ? DataQualityStatus.PASSED.name() : 
DataQualityStatus.FAILED.name());

Review Comment:
   `TaskLevelPolicyChecker` is a generic policy executor.. this change tightly 
couples it to data quality evaluation by writing DataQualityStatus into a 
`State` variable. Please move this logic to a higher level component(i.e. 
caller), so that this class is responsible for policy execution only.



##########
gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FileSizePolicyTest {
+
+  @Test
+  public void testPolicyPass() {
+    State state = new State();
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    
Assert.assertTrue(policy.executePolicy().equals(TaskLevelPolicy.Result.PASSED));

Review Comment:
   use `Assert.assertEquals`



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+public class FileSizePolicy extends TaskLevelPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSizePolicy.class);
+
+  public static final String BYTES_READ_KEY = "gobblin.copy.bytesRead";
+  public static final String BYTES_WRITTEN_KEY = "gobblin.copy.bytesWritten";
+
+  private final long bytesRead;
+  private final long bytesWritten;
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+    this.bytesRead = state.getPropAsLong(BYTES_READ_KEY, 0);
+    this.bytesWritten = state.getPropAsLong(BYTES_WRITTEN_KEY, 0);
+  }
+
+  @Override
+  public Result executePolicy() {
+    double sizeDifference = Math.abs(this.bytesRead - this.bytesWritten);
+
+    if (sizeDifference == 0) {
+      return Result.PASSED;
+    }
+
+    LOG.warn("File size check failed - bytes read: {}, bytes written: {}, 
difference: {}",
+        this.bytesRead, this.bytesWritten, sizeDifference);
+    return Result.FAILED;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]",
+        this.bytesRead, this.bytesWritten);
+  }
+
+}

Review Comment:
   nit: add newline at end of file in all new files



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+public class FileSizePolicy extends TaskLevelPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSizePolicy.class);
+
+  public static final String BYTES_READ_KEY = "gobblin.copy.bytesRead";
+  public static final String BYTES_WRITTEN_KEY = "gobblin.copy.bytesWritten";
+
+  private final long bytesRead;
+  private final long bytesWritten;
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+    this.bytesRead = state.getPropAsLong(BYTES_READ_KEY, 0);
+    this.bytesWritten = state.getPropAsLong(BYTES_WRITTEN_KEY, 0);

Review Comment:
   if props are missing, setting default to 0 might hide actual issue and cause 
false success/failures. Is there a case where we expect the default value to be 
used? If no, let's validate that both props are explicitly set



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java:
##########
@@ -308,6 +312,8 @@ public boolean apply(FileStatus input) {
         os.close();
         log.info("OutputStream for file {} is closed.", writeAt);
         inputStream.close();
+        long actualFileSize = this.fs.getFileStatus(writeAt).getLen();
+        this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, actualFileSize);

Review Comment:
   `fs.getFileStatus` can throw an IOException.. it would be better to move 
this to a helper method, eg. `recordBytesWritten` which also can also logs the 
exception



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +798,60 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreQualityStatus(JobState jobState) {
+      TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = 
TaskLevelPolicyChecker.DataQualityStatus.PASSED;
+
+      for (TaskState taskState : getTaskStates()) {
+        String qualityResult = 
taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (qualityResult != null && 
!TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) {
+          log.info("Data quality not passed: " + qualityResult);

Review Comment:
   `log.warn`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer;
+
+import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Builder for {@link IncorrectSizeFileAwareInputStreamDataWriter}.
+ */
+public class IncorrectSizeFileAwareInputStreamDataWriterBuilder extends 
DataWriterBuilder<String, FileAwareInputStream> {
+
+  @Override
+  public final DataWriter<FileAwareInputStream> build() throws IOException {
+    setJobSpecificOutputPaths(this.destination.getProperties());
+    
this.destination.getProperties().setProp(ConfigurationKeys.WRITER_FILE_PATH, 
this.writerId);
+    return buildWriter();
+  }
+
+  protected DataWriter<FileAwareInputStream> buildWriter() throws IOException {
+    return new 
IncorrectSizeFileAwareInputStreamDataWriter(this.destination.getProperties(), 
this.branches, this.branch, this.writerAttemptId);
+  }
+
+  /**
+   * Each job gets its own task-staging and task-output directory. Update the 
staging and output directories to
+   * contain job_id. This is to make sure uncleaned data from previous 
execution does not corrupt final published data
+   * produced by this execution.
+   */
+  public synchronized static void setJobSpecificOutputPaths(State state) {
+    if 
(!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
+        state.getProp(ConfigurationKeys.JOB_ID_KEY))) {
+
+      state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
+          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
+      state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+          state.getProp(ConfigurationKeys.JOB_ID_KEY)));
+    }
+  }
+}

Review Comment:
   add `end of line`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +798,60 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreQualityStatus(JobState jobState) {
+      TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = 
TaskLevelPolicyChecker.DataQualityStatus.PASSED;
+
+      for (TaskState taskState : getTaskStates()) {
+        String qualityResult = 
taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (qualityResult != null && 
!TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) {
+          log.info("Data quality not passed: " + qualityResult);
+          jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.FAILED;
+          break;

Review Comment:
   instead of stopping the loop on any failure, it would be better to iterate 
over all tasks and add a log with aggregate count that DQ failed for `M` out of 
`N` tasks



##########
gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FileSizePolicyTest {
+
+  @Test
+  public void testPolicyPass() {
+    State state = new State();
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    
Assert.assertTrue(policy.executePolicy().equals(TaskLevelPolicy.Result.PASSED));
+  }
+
+  @Test
+  public void testPolicyFail() {
+    State state = new State();
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    
Assert.assertTrue(policy.executePolicy().equals(TaskLevelPolicy.Result.FAILED));
+  }
+

Review Comment:
   add a test case if property is missing or if only one property is set



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java:
##########
@@ -207,4 +208,65 @@ public void testToJobExecutionInfo() {
     Collections.sort(taskStateIds);
     Assert.assertEquals(taskStateIds, Lists.newArrayList("TestTask-0", 
"TestTask-1", "TestTask-2"));
   }
+
+  @Test
+  public void testDataQualityStatus() {
+    // Create a new job state for testing data quality
+    JobState.DatasetState datasetState = new 
JobState.DatasetState("DataQualityTestJob", "DataQualityTestJob-1");
+
+    // Create task states with different data quality results
+    for (int i = 0; i < 3; i++) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+      WorkUnitState workUnitState = new WorkUnitState(workUnit);
+      workUnitState.setProp(ConfigurationKeys.JOB_ID_KEY, 
"DataQualityTestJob-1");
+      workUnitState.setProp(ConfigurationKeys.TASK_ID_KEY, "DataQualityTask-" 
+ i);
+      workUnitState.setProp(ConfigurationKeys.DATASET_URN_KEY, "TestDataset");
+
+      TaskState taskState = new TaskState(workUnitState);
+      taskState.setTaskId("DataQualityTask-" + i);
+      taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
+
+      // Set different data quality results for each task
+      switch (i) {
+        case 0:
+          // First task passes data quality
+          
taskState.setProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY, 
"PASSED");

Review Comment:
   use enum/constant



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +798,60 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreQualityStatus(JobState jobState) {
+      TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = 
TaskLevelPolicyChecker.DataQualityStatus.PASSED;
+
+      for (TaskState taskState : getTaskStates()) {
+        String qualityResult = 
taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (qualityResult != null && 
!TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) {

Review Comment:
   it would be better to get the `qualityResult` in an enum and then compare it 
with `DataQualityStatus.PASSED`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java:
##########
@@ -316,6 +317,23 @@ private void completeShutdown() {
     this.shutdownLatch.countDown();
   }
 
+  private void computeAndUpdateTaskDataQuality() {
+    String overallTaskDataQuality = 
TaskLevelPolicyChecker.DataQualityStatus.PASSED.name();
+    for (Optional<Fork> fork : this.forks.keySet()) {
+      if (fork.isPresent()) {
+        TaskState forkTaskState = fork.get().getTaskState();
+        if (forkTaskState != null) {
+          String forkDataQualityResult = 
forkTaskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY);

Review Comment:
   get the result in an enum and then use it to compare



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to 
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to