Blazer-007 commented on code in PR #4113: URL: https://github.com/apache/gobblin/pull/4113#discussion_r2093977179
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +798,60 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreQualityStatus(JobState jobState) { + TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED; + + for (TaskState taskState : getTaskStates()) { + String qualityResult = taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + log.info("Data quality status of this task is: " + qualityResult); + if (qualityResult != null && !TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) { + log.info("Data quality not passed: " + qualityResult); + jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.FAILED; + break; + } + } + super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, jobDataQuality.name()); + + // Emit OTEL metrics for data quality + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + if (otelMetrics != null) { + Attributes tags = Attributes.builder() + .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobState.getJobName()) + .put(TimingEvent.DATASET_URN, this.getDatasetUrn()) + .build(); + + // Emit data quality status (1 for PASSED, 0 for FAILED) + TaskLevelPolicyChecker.DataQualityStatus finalJobDataQuality = jobDataQuality; + log.info("Data quality status for this job is " + finalJobDataQuality); + otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME) + .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME) + .ofLongs() + .buildWithCallback(measurement -> { + log.info("Emitting metric for data quality"); + measurement.record(TaskLevelPolicyChecker.DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 : 0, tags); + }); + } + } + + /** + * Gets the overall data quality status of the dataset. + * @return "PASSED" if all tasks passed their quality checks, "FAILED" otherwise + */ + public String getDataQualityStatus() { + return super.getProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, TaskLevelPolicyChecker.DataQualityStatus.FAILED.name()); + } + @Override protected void propsToJson(JsonWriter jsonWriter) throws IOException { jsonWriter.beginObject(); jsonWriter.name(ConfigurationKeys.DATASET_URN_KEY).value(getDatasetUrn()); jsonWriter.name(ConfigurationKeys.JOB_FAILURES_KEY).value(getJobFailures()); + jsonWriter.name(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY).value(getDataQualityStatus()); jsonWriter.endObject(); Review Comment: should this be added to `writeStateSummary(...)` function too below ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +798,60 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreQualityStatus(JobState jobState) { + TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED; Review Comment: `computeAndStoreQualityStatus` --> `computeAndStoreDatasetQualityStatus` ?? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java: ########## @@ -788,12 +798,60 @@ public int getJobFailures() { return Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY)); } + /** + * Computes and stores the overall data quality status based on task-level policy results. + * The status will be "PASSED" if all tasks passed their quality checks, "FAILED" otherwise. + */ + public void computeAndStoreQualityStatus(JobState jobState) { + TaskLevelPolicyChecker.DataQualityStatus jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.PASSED; + + for (TaskState taskState : getTaskStates()) { + String qualityResult = taskState.getProp(TaskLevelPolicyChecker.TASK_LEVEL_POLICY_RESULT_KEY); + log.info("Data quality status of this task is: " + qualityResult); + if (qualityResult != null && !TaskLevelPolicyChecker.DataQualityStatus.PASSED.name().equals(qualityResult)) { + log.info("Data quality not passed: " + qualityResult); + jobDataQuality = TaskLevelPolicyChecker.DataQualityStatus.FAILED; + break; + } + } + super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, jobDataQuality.name()); + + // Emit OTEL metrics for data quality + OpenTelemetryMetricsBase otelMetrics = OpenTelemetryMetrics.getInstance(jobState); + if (otelMetrics != null) { + Attributes tags = Attributes.builder() + .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobState.getJobName()) + .put(TimingEvent.DATASET_URN, this.getDatasetUrn()) + .build(); + Review Comment: Instead of only one tag jobName, I will prefer different tags flowGroup, flowName, flowExecutionId and also other common tags like fabric, flowEdgeId, sourceNode, destinationNode, specExecutor should also be added across all new metrics ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java: ########## @@ -22,6 +22,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker; import org.slf4j.Logger; Review Comment: import order ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java: ########## @@ -90,6 +90,7 @@ public Void call() metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class); finalizeDatasetStateBeforeCommit(this.datasetState); + this.datasetState.computeAndStoreQualityStatus(this.jobContext.getJobState()); Class<? extends DataPublisher> dataPublisherClass; Review Comment: QQ : Is my understanding correct that we are just storing store of data quality check and state of workunit is not changed failed or passed based on data quality check ? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java: ########## @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.BooleanUtils; +import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker; Review Comment: import order -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org