This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c6710322f Evaluate data quality only when Policies are applied (#4127)
8c6710322f is described below

commit 8c6710322f8cba2a0d7d5897867b46cd2da51cfe
Author: vsinghal85 <vsingha...@gmail.com>
AuthorDate: Fri Aug 8 13:55:06 2025 +0530

    Evaluate data quality only when Policies are applied (#4127)
    
    * add shouldEvaluate data quality check at dataset level
    
    * Refactor checks for invoking evaluateAndEmitDataQuality
    
    ---------
    
    Co-authored-by: Vaibhav Singhal <vaibs...@vaibsing-mn7618.linkedin.biz>
---
 .../apache/gobblin/runtime/SafeDatasetCommit.java  | 26 ++++++++++++++--------
 1 file changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index de355e47d0..a522c21ed7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -94,14 +94,8 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
     metricContext = Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class);
 
     finalizeDatasetStateBeforeCommit(this.datasetState);
-    // evaluate data quality at the dataset commit level, only when commit 
source is CommitActivityImpl
-    if 
(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc))
 {
-      log.info("Evaluating data quality for commit activity for dataset {}.", 
this.datasetUrn);
-       evaluateAndEmitDatasetQuality();
-    } else {
-      log.info("Skipping data quality evaluation for dataset {} as commit 
source is {}", this.datasetUrn,
-          this.datasetCommitSrc);
-    }
+    // evaluate data quality at the dataset commit level
+    evaluateAndEmitDatasetQuality();
     Class<? extends DataPublisher> dataPublisherClass;
     try (Closer closer = Closer.create()) {
       dataPublisherClass = 
JobContext.getJobDataPublisherClass(this.jobContext.getJobState())
@@ -455,9 +449,23 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
    * This method handles the business logic of data quality evaluation
    * at the dataset commit level, which is more appropriate than having
    * it in the JobState data container.
+   *
+   * Data quality evaluation is only performed when:
+   * 1. Commit source is CommitActivityImpl
+   * 2. Data quality policies are applied to the job
    */
   private void evaluateAndEmitDatasetQuality() {
-    DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState, 
this.jobContext.getJobState());
+    JobState jobState = this.jobContext.getJobState();
+    String policiesApplied = 
jobState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST, StringUtils.EMPTY);
+    log.info("Policies applied: {}", policiesApplied);
+    boolean shouldEvaluateDataQuality = !policiesApplied.isEmpty();
+    if (shouldEvaluateDataQuality && 
SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc))
 {
+      log.info("Evaluating data quality for commit activity for dataset {}.", 
this.datasetUrn);
+      DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState, 
jobState);
+    } else {
+      log.info("Skipping data quality evaluation for dataset {} as commit 
source is {} and policies applied are {}",
+          this.datasetUrn, this.datasetCommitSrc, policiesApplied);
+    }
   }
 
 }

Reply via email to