This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8c6710322f Evaluate data quality only when Policies are applied (#4127)
8c6710322f is described below
commit 8c6710322f8cba2a0d7d5897867b46cd2da51cfe
Author: vsinghal85 <[email protected]>
AuthorDate: Fri Aug 8 13:55:06 2025 +0530
Evaluate data quality only when Policies are applied (#4127)
* add shouldEvaluate data quality check at dataset level
* Refactor checks for invoking evaluateAndEmitDataQuality
---------
Co-authored-by: Vaibhav Singhal <[email protected]>
---
.../apache/gobblin/runtime/SafeDatasetCommit.java | 26 ++++++++++++++--------
1 file changed, 17 insertions(+), 9 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index de355e47d0..a522c21ed7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -94,14 +94,8 @@ public final class SafeDatasetCommit implements
Callable<Void> {
metricContext = Instrumented.getMetricContext(datasetState,
SafeDatasetCommit.class);
finalizeDatasetStateBeforeCommit(this.datasetState);
- // evaluate data quality at the dataset commit level, only when commit
source is CommitActivityImpl
- if
(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc))
{
- log.info("Evaluating data quality for commit activity for dataset {}.",
this.datasetUrn);
- evaluateAndEmitDatasetQuality();
- } else {
- log.info("Skipping data quality evaluation for dataset {} as commit
source is {}", this.datasetUrn,
- this.datasetCommitSrc);
- }
+ // evaluate data quality at the dataset commit level
+ evaluateAndEmitDatasetQuality();
Class<? extends DataPublisher> dataPublisherClass;
try (Closer closer = Closer.create()) {
dataPublisherClass =
JobContext.getJobDataPublisherClass(this.jobContext.getJobState())
@@ -455,9 +449,23 @@ public final class SafeDatasetCommit implements
Callable<Void> {
* This method handles the business logic of data quality evaluation
* at the dataset commit level, which is more appropriate than having
* it in the JobState data container.
+ *
+ * Data quality evaluation is only performed when:
+ * 1. Commit source is CommitActivityImpl
+ * 2. Data quality policies are applied to the job
*/
private void evaluateAndEmitDatasetQuality() {
- DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState,
this.jobContext.getJobState());
+ JobState jobState = this.jobContext.getJobState();
+ String policiesApplied =
jobState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST, StringUtils.EMPTY);
+ log.info("Policies applied: {}", policiesApplied);
+ boolean shouldEvaluateDataQuality = !policiesApplied.isEmpty();
+ if (shouldEvaluateDataQuality &&
SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc))
{
+ log.info("Evaluating data quality for commit activity for dataset {}.",
this.datasetUrn);
+ DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState,
jobState);
+ } else {
+ log.info("Skipping data quality evaluation for dataset {} as commit
source is {} and policies applied are {}",
+ this.datasetUrn, this.datasetCommitSrc, policiesApplied);
+ }
}
}