This is an automated email from the ASF dual-hosted git repository. vivekrai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 8c6710322f Evaluate data quality only when Policies are applied (#4127) 8c6710322f is described below commit 8c6710322f8cba2a0d7d5897867b46cd2da51cfe Author: vsinghal85 <vsingha...@gmail.com> AuthorDate: Fri Aug 8 13:55:06 2025 +0530 Evaluate data quality only when Policies are applied (#4127) * add shouldEvaluate data quality check at dataset level * Refactor checks for invoking evaluateAndEmitDataQuality --------- Co-authored-by: Vaibhav Singhal <vaibs...@vaibsing-mn7618.linkedin.biz> --- .../apache/gobblin/runtime/SafeDatasetCommit.java | 26 ++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index de355e47d0..a522c21ed7 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -94,14 +94,8 @@ public final class SafeDatasetCommit implements Callable<Void> { metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class); finalizeDatasetStateBeforeCommit(this.datasetState); - // evaluate data quality at the dataset commit level, only when commit source is CommitActivityImpl - if (SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)) { - log.info("Evaluating data quality for commit activity for dataset {}.", this.datasetUrn); - evaluateAndEmitDatasetQuality(); - } else { - log.info("Skipping data quality evaluation for dataset {} as commit source is {}", this.datasetUrn, - this.datasetCommitSrc); - } + // evaluate data quality at the dataset commit level + evaluateAndEmitDatasetQuality(); Class<? extends DataPublisher> dataPublisherClass; try (Closer closer = Closer.create()) { dataPublisherClass = JobContext.getJobDataPublisherClass(this.jobContext.getJobState()) @@ -455,9 +449,23 @@ public final class SafeDatasetCommit implements Callable<Void> { * This method handles the business logic of data quality evaluation * at the dataset commit level, which is more appropriate than having * it in the JobState data container. + * + * Data quality evaluation is only performed when: + * 1. Commit source is CommitActivityImpl + * 2. Data quality policies are applied to the job */ private void evaluateAndEmitDatasetQuality() { - DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState, this.jobContext.getJobState()); + JobState jobState = this.jobContext.getJobState(); + String policiesApplied = jobState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST, StringUtils.EMPTY); + log.info("Policies applied: {}", policiesApplied); + boolean shouldEvaluateDataQuality = !policiesApplied.isEmpty(); + if (shouldEvaluateDataQuality && SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)) { + log.info("Evaluating data quality for commit activity for dataset {}.", this.datasetUrn); + DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState, jobState); + } else { + log.info("Skipping data quality evaluation for dataset {} as commit source is {} and policies applied are {}", + this.datasetUrn, this.datasetCommitSrc, policiesApplied); + } } }