This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4c4d2ba612 [GOBBLIN-2204] Implement FileSize Data Quality for
FileBasedCopy
4c4d2ba612 is described below
commit 4c4d2ba61261c25de4930d96ba186874bf312082
Author: vsinghal85 <[email protected]>
AuthorDate: Tue Aug 5 20:55:11 2025 +0530
[GOBBLIN-2204] Implement FileSize Data Quality for FileBasedCopy
* Compute data quality and update task states
* Retry for failed data quality check
* Only do data quality check if data quality flag is enabled
* Computing overall dataquality of data set and adding it to dataset summary
* Changes for testing failed data quality changes
* Updating task state and computing overall data quality
* Refactor and checkstyle fix
* Propogate data quality from fork to task state
* Refactoring and unit tests
* Add unit tests for FileAwareInputStreamDataWriterTest
* refactor changes
* Compute task level data quality
* Some refactoring and code cleanup
* Update tests
* Fix checkstyle failures
* Code cleanup
* Adding metrics for overall data quality
* Address PR comments and add metric for no of files evaluated for data
quality
* Refactor dq evaluation
* Address PR comments
* Introduce data quality flag, to control and manage job failures due to
data quality
* Address PR comments
* Switch back to CounterBuilder approach for metrics
* Evaluate data quality only for commit via CommitActivity
* Remove unused enum
* move getDataQuality to JobState
* Address PR comments
* Fix the policy evaluation logic when both optional and mandatory policies
are provided
* Remove codestyle errors and remove flag check at task level
---------
Co-authored-by: Vaibhav Singhal <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 6 +-
.../qualitychecker/task/TaskLevelPolicy.java | 2 +-
.../gobblin/policies/size/FileSizePolicy.java | 109 +++++++++
.../apache/gobblin/publisher/TaskPublisher.java | 5 +-
.../gobblin/qualitychecker/DataQualityStatus.java | 49 ++++
.../task/TaskLevelPolicyCheckResults.java | 5 +-
.../task/TaskLevelPolicyChecker.java | 22 +-
.../gobblin/policies/size/FileSizePolicyTest.java | 72 ++++++
.../qualitychecker/FailingTaskLevelPolicy.java} | 22 +-
.../RowCountTaskLevelPolicyTest.java | 13 +-
.../TaskLevelQualityCheckerTest.java | 69 ------
.../task/TaskLevelQualityCheckerTest.java | 106 +++++++++
.../writer/FileAwareInputStreamDataWriter.java | 22 +-
...ncorrectSizeFileAwareInputStreamDataWriter.java | 84 +++++++
...tSizeFileAwareInputStreamDataWriterBuilder.java | 60 +++++
.../writer/FileAwareInputStreamDataWriterTest.java | 42 ++++
.../apache/gobblin/metrics/event/TimingEvent.java | 4 +
.../apache/gobblin/metrics/ServiceMetricNames.java | 6 +
.../gobblin/quality/DataQualityEvaluator.java | 218 +++++++++++++++++
.../gobblin/runtime/AbstractJobLauncher.java | 4 +-
.../apache/gobblin/runtime/DatasetTaskSummary.java | 10 +-
.../gobblin/runtime/ForkThrowableHolder.java | 6 +-
.../org/apache/gobblin/runtime/JobContext.java | 2 +-
.../java/org/apache/gobblin/runtime/JobState.java | 55 +++--
.../apache/gobblin/runtime/SafeDatasetCommit.java | 22 ++
.../main/java/org/apache/gobblin/runtime/Task.java | 41 +++-
.../java/org/apache/gobblin/runtime/fork/Fork.java | 24 +-
.../runtime/local/LocalTaskStateTracker.java | 1 +
.../gobblin/quality/DataQualityEvaluatorTest.java | 265 +++++++++++++++++++++
.../org/apache/gobblin/runtime/JobStateTest.java | 4 +-
.../GaaSJobObservabilityProducerTest.java | 4 +-
.../ddm/activity/impl/CommitActivityImpl.java | 12 +-
.../gobblin/temporal/ddm/work/DatasetStats.java | 1 +
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 3 +-
34 files changed, 1220 insertions(+), 150 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 195ca295ee..d33a99c2e5 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -328,6 +328,9 @@ public class ConfigurationKeys {
public static final String DATASET_URN_KEY = "dataset.urn";
public static final String GLOBAL_WATERMARK_DATASET_URN =
"__globalDatasetWatermark";
public static final String DEFAULT_DATASET_URN = "";
+ public static final String DATASET_QUALITY_STATUS_KEY =
"dataset.quality.status";
+ public static final String ENFORCE_DATA_QUALITY_FAILURE_KEY =
"data.quality.enforce.failure";
+ public static final Boolean DEFAULT_ENFORCE_DATA_QUALITY_FAILURE = false;
/**
* Work unit related configuration properties.
@@ -498,6 +501,7 @@ public class ConfigurationKeys {
* Configuration properties used by the quality checker.
*/
public static final String QUALITY_CHECKER_PREFIX = "qualitychecker";
+ public static final String TASK_LEVEL_POLICY_RESULT_KEY =
"gobblin.task.level.policy.result";
public static final String TASK_LEVEL_POLICY_LIST = QUALITY_CHECKER_PREFIX +
".task.policies";
public static final String TASK_LEVEL_POLICY_LIST_TYPE =
QUALITY_CHECKER_PREFIX + ".task.policy.types";
public static final String ROW_LEVEL_POLICY_LIST = QUALITY_CHECKER_PREFIX +
".row.policies";
@@ -938,7 +942,7 @@ public class ConfigurationKeys {
public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED
= false;
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";
-
+ public static final String METRICS_REPORTING_OPENTELEMETRY_FABRIC =
METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX + "fabric";
// Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON
String with string keys and values
public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS =
METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
index d016adfe0a..b5043e01e8 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
@@ -22,7 +22,7 @@ import org.apache.gobblin.configuration.WorkUnitState;
public abstract class TaskLevelPolicy {
- private final State state;
+ protected final State state;
private final Type type;
public enum Type {
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
new file mode 100644
index 0000000000..d5f935d3bc
--- /dev/null
+++
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ TransferBytes transferBytes =
getBytesReadAndWritten(this.state).orElse(null);
+ if (transferBytes == null) {
+ return Result.FAILED;
+ }
+ Long bytesRead = transferBytes.getBytesRead();
+ Long bytesWritten = transferBytes.getBytesWritten();
+
+ Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+ if (sizeDifference == 0) {
+ return Result.PASSED;
+ }
+
+ log.warn("File size check failed - bytes read: {}, bytes written: {},
difference: {}", bytesRead, bytesWritten,
+ sizeDifference);
+ return Result.FAILED;
+ }
+
+ @Override
+ public String toString() {
+ TransferBytes transferBytes =
getBytesReadAndWritten(this.state).orElse(null);
+ if (transferBytes != null) {
+ return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]",
transferBytes.getBytesRead(),
+ transferBytes.getBytesWritten());
+ } else {
+ return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+ }
+ }
+
+ /**
+ * Helper class to hold transfer bytes information
+ */
+ @Getter
+ private static class TransferBytes {
+ final long bytesRead;
+ final long bytesWritten;
+
+ TransferBytes(long bytesRead, long bytesWritten) {
+ this.bytesRead = bytesRead;
+ this.bytesWritten = bytesWritten;
+ }
+ }
+
+ /**
+ * Extracts bytesRead and bytesWritten from the given state.
+ * Returns Empty Optional if parsing fails.
+ */
+ private Optional<TransferBytes> getBytesReadAndWritten(State state) {
+ String bytesReadString = state.getProp(BYTES_READ_KEY);
+ String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+ if (bytesReadString == null || bytesWrittenString == null) {
+ log.error("Missing value(s): bytesReadStr={}, bytesWrittenStr={}",
bytesReadString, bytesWrittenString);
+ return Optional.empty();
+ }
+ try {
+ long bytesRead = Long.parseLong(bytesReadString);
+ long bytesWritten = Long.parseLong(bytesWrittenString);
+ return Optional.of(new TransferBytes(bytesRead, bytesWritten));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format for bytesRead or bytesWritten:
bytesRead='{}', bytesWritten='{}'",
+ bytesReadString, bytesWrittenString, e);
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java
index 5ca87b0ea0..7a5ad4c1d6 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TaskPublisher.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.publisher;
+import java.util.Set;
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
import java.util.Map;
@@ -63,8 +64,8 @@ public class TaskPublisher {
* Returns true if all tests from the PolicyChecker pass, false otherwise
*/
public boolean passedAllTests() {
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
this.results.getPolicyResults().entrySet()) {
- if (entry.getKey().equals(TaskLevelPolicy.Result.FAILED) &&
entry.getValue().equals(TaskLevelPolicy.Type.FAIL)) {
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
this.results.getPolicyResults().entrySet()) {
+ if (entry.getKey().equals(TaskLevelPolicy.Result.FAILED) &&
entry.getValue().contains(TaskLevelPolicy.Type.FAIL)) {
return false;
}
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java
new file mode 100644
index 0000000000..855beeb729
--- /dev/null
+++
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An enumeration for possible statuses for Data quality checks.
+ * Its values will be:
+ * - PASSED: When all data quality checks pass
+ * - FAILED: When any data quality check fails
+ * - NOT_EVALUATED: When data quality check evaluation is not performed
+ * - UNKNOWN: when the data quality status is not recognized or invalid
+ */
+@Slf4j
+public enum DataQualityStatus {
+ PASSED,
+ FAILED,
+ NOT_EVALUATED,
+ UNKNOWN;
+
+ public static DataQualityStatus fromString(String value) {
+ if (value == null) {
+ return NOT_EVALUATED;
+ }
+ try {
+ return DataQualityStatus.valueOf(value.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid DataQualityStatus value: {}. Returning UNKNOWN.",
value, e);
+ return UNKNOWN;
+ }
+ }
+}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
index 6233f3891f..3094b523d6 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
@@ -19,19 +19,20 @@ package org.apache.gobblin.qualitychecker.task;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
/**
* Wrapper around a Map of PolicyResults and Policy.Type
*/
public class TaskLevelPolicyCheckResults {
- private final Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> results;
+ private final Map<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> results;
public TaskLevelPolicyCheckResults() {
this.results = new HashMap<>();
}
- public Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> getPolicyResults() {
+ public Map<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>>
getPolicyResults() {
return this.results;
}
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
index 909248a4df..a2c19d2fae 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java
@@ -17,8 +17,9 @@
package org.apache.gobblin.qualitychecker.task;
+import java.util.EnumSet;
import java.util.List;
-
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,30 +29,25 @@ import org.slf4j.LoggerFactory;
* executes each one, and then stores the output
* in a PolicyCheckResults object
*/
+@Getter
public class TaskLevelPolicyChecker {
- /**
- * An enumeration for possible statuses for Data quality checks,
- * its values will be PASSED, FAILED, in case if data quality check
- * evaluation is not performed for Job, it will be NOT_EVALUATED
- */
- public enum DataQualityStatus {
- PASSED,
- FAILED,
- NOT_EVALUATED
- }
private final List<TaskLevelPolicy> list;
private static final Logger LOG =
LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
+ public static final String TASK_LEVEL_POLICY_RESULT_KEY =
"gobblin.task.level.policy.result";
+
public TaskLevelPolicyChecker(List<TaskLevelPolicy> list) {
this.list = list;
}
public TaskLevelPolicyCheckResults executePolicies() {
TaskLevelPolicyCheckResults results = new TaskLevelPolicyCheckResults();
+
for (TaskLevelPolicy p : this.list) {
TaskLevelPolicy.Result result = p.executePolicy();
- results.getPolicyResults().put(result, p.getType());
- LOG.info("TaskLevelPolicy " + p + " of type " + p.getType() + " executed
with result " + result);
+ results.getPolicyResults().computeIfAbsent(result, r ->
EnumSet.noneOf(TaskLevelPolicy.Type.class))
+ .add(p.getType());
+ LOG.info("TaskLevelPolicy {} of type {} executed with result {}", p,
p.getType(), result);
}
return results;
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
new file mode 100644
index 0000000000..0129891748
--- /dev/null
+++
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FileSizePolicyTest {
+
+ @Test
+ public void testPolicyPass() {
+ State state = new State();
+ state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+ state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.PASSED);
+ }
+
+ @Test
+ public void testPolicyFail() {
+ State state = new State();
+ state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+ state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L);
+
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ }
+
+ @Test
+ public void testMissingProperties() {
+ State state = new State();
+ // No properties set at all
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ }
+
+ @Test
+ public void testPartiallySetProperties() {
+ State state = new State();
+ // Only set bytes read, not bytes written
+ state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+
+ // Reset state and only set bytes written, not bytes read
+ state = new State();
+ state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+ policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ }
+
+}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/FailingTaskLevelPolicy.java
similarity index 63%
copy from
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
copy to
gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/FailingTaskLevelPolicy.java
index 6233f3891f..d0e907197b 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckResults.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/FailingTaskLevelPolicy.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-package org.apache.gobblin.qualitychecker.task;
+package org.apache.gobblin.qualitychecker;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
-/**
- * Wrapper around a Map of PolicyResults and Policy.Type
- */
-public class TaskLevelPolicyCheckResults {
- private final Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> results;
-
- public TaskLevelPolicyCheckResults() {
- this.results = new HashMap<>();
+public class FailingTaskLevelPolicy extends TaskLevelPolicy {
+ public FailingTaskLevelPolicy(State state, Type type) {
+ super(state, type);
}
- public Map<TaskLevelPolicy.Result, TaskLevelPolicy.Type> getPolicyResults() {
- return this.results;
+ @Override
+ public Result executePolicy() {
+ return Result.FAILED;
}
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
index 43736fa318..d986e1687b 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowCountTaskLevelPolicyTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.qualitychecker;
+import java.util.Set;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
@@ -46,7 +47,7 @@ public class RowCountTaskLevelPolicyTest {
state.setProp(ConfigurationKeys.WRITER_ROWS_WRITTEN, WRITER_ROWS_WRITTEN);
TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
}
}
@@ -61,7 +62,7 @@ public class RowCountTaskLevelPolicyTest {
state.setProp(ConfigurationKeys.WRITER_ROWS_WRITTEN, -1);
TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
}
}
@@ -77,7 +78,7 @@ public class RowCountTaskLevelPolicyTest {
state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
}
}
@@ -93,7 +94,7 @@ public class RowCountTaskLevelPolicyTest {
state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
}
}
@@ -109,7 +110,7 @@ public class RowCountTaskLevelPolicyTest {
state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
}
}
@@ -126,7 +127,7 @@ public class RowCountTaskLevelPolicyTest {
state.setProp(ConfigurationKeys.ROW_COUNT_RANGE, "0.05");
TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
}
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TaskLevelQualityCheckerTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TaskLevelQualityCheckerTest.java
deleted file mode 100644
index c366faa10d..0000000000
---
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TaskLevelQualityCheckerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.qualitychecker;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckResults;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
-import
org.apache.gobblin.qualitychecker.task.TaskLevelPolicyCheckerBuilderFactory;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.configuration.State;
-
-
-@Test(groups = {"gobblin.qualitychecker"})
-public class TaskLevelQualityCheckerTest {
-
- @Test
- public void testPolicyChecker()
- throws Exception {
- State state = new State();
- state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
- state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL");
-
- TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
- Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
- }
- }
-
- @Test
- public void testMultiplePolicies()
- throws Exception {
- State state = new State();
- state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
-
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy,org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
- state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL,FAIL");
-
- TaskLevelPolicyCheckResults results = getPolicyResults(state);
- for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
- Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
- }
- }
-
- private TaskLevelPolicyCheckResults getPolicyResults(State state)
- throws Exception {
- TaskLevelPolicyChecker checker =
- new
TaskLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state,
-1).build();
- return checker.executePolicies();
- }
-}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelQualityCheckerTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelQualityCheckerTest.java
new file mode 100644
index 0000000000..484a851787
--- /dev/null
+++
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelQualityCheckerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker.task;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(groups = {"gobblin.qualitychecker"})
+public class TaskLevelQualityCheckerTest {
+
+ @Test
+ public void testPolicyChecker()
+ throws Exception {
+ State state = new State();
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL");
+
+ TaskLevelPolicyCheckResults results = getPolicyResults(state);
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
+ Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
+ }
+ }
+
+ @Test
+ public void testMultiplePoliciesAllPass()
+ throws Exception {
+ State state = new State();
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+
"org.apache.gobblin.qualitychecker.TestTaskLevelPolicy,org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL,FAIL");
+
+ TaskLevelPolicyCheckResults results = getPolicyResults(state);
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
+ Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
+ }
+ }
+
+ @Test
+ public void testMultiplePoliciesAllFail()
+ throws Exception {
+ State state = new State();
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+
"org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy");
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE, "FAIL,FAIL");
+
+ TaskLevelPolicyCheckResults results = getPolicyResults(state);
+ for (Map.Entry<TaskLevelPolicy.Result, Set<TaskLevelPolicy.Type>> entry :
results.getPolicyResults().entrySet()) {
+ Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+ }
+ }
+
+ @Test
+ public void testMultiplePoliciesAllFailWithDifferentPolicyTypes()
+ throws Exception {
+ State state = new State();
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+
"org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy");
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE,
"OPTIONAL,FAIL");
+
+ TaskLevelPolicyCheckResults results = getPolicyResults(state);
+
Assert.assertEquals(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).size(),
2);
+ }
+
+ @Test
+ public void testMultiplePoliciesFailAndPassWithDifferentPolicyTypes()
+ throws Exception {
+ State state = new State();
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST,
+
"org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.FailingTaskLevelPolicy,org.apache.gobblin.qualitychecker.TestTaskLevelPolicy");
+ state.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_LIST_TYPE,
"OPTIONAL,FAIL,FAIL");
+
+ TaskLevelPolicyCheckResults results = getPolicyResults(state);
+
Assert.assertEquals(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).size(),
2);
+
Assert.assertEquals(results.getPolicyResults().get(TaskLevelPolicy.Result.PASSED).size(),
1);
+
Assert.assertTrue(results.getPolicyResults().get(TaskLevelPolicy.Result.PASSED).contains(TaskLevelPolicy.Type.FAIL));
+
Assert.assertTrue(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).contains(TaskLevelPolicy.Type.FAIL));
+
Assert.assertTrue(results.getPolicyResults().get(TaskLevelPolicy.Result.FAILED).contains(TaskLevelPolicy.Type.OPTIONAL));
+ }
+
+ private TaskLevelPolicyCheckResults getPolicyResults(State state)
+ throws Exception {
+ TaskLevelPolicyChecker checker =
+ new
TaskLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state,
-1).build();
+ return checker.executePolicies();
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 4413ce5032..0eb50a4607 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -62,6 +62,7 @@ import
org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter;
+import org.apache.gobblin.policies.size.FileSizePolicy;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.FinalState;
@@ -212,6 +213,21 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
this.filesWritten.incrementAndGet();
}
+ /**
+ * Records the actual file size in the state after writing.
+ * @param writeAt The path where the file was written
+ * @throws IOException if there is an error getting the file status
+ */
+ private void recordBytesWritten(Path writeAt) throws IOException {
+ try {
+ long actualFileSize = this.fs.getFileStatus(writeAt).getLen();
+ this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, actualFileSize);
+ } catch (IOException e) {
+ log.error("Failed to get file status for {} to record bytes written",
writeAt, e);
+ throw e;
+ }
+ }
+
/**
* Write the contents of input stream into staging path.
*
@@ -236,6 +252,9 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
final long blockSize = copyableFile.getBlockSize(this.fs);
final long fileSize = copyableFile.getFileStatus().getLen();
+ // Store source file size in task state
+ this.state.setProp(FileSizePolicy.BYTES_READ_KEY, fileSize);
+
long expectedBytes = fileSize;
Long maxBytes = null;
// Whether writer must write EXACTLY maxBytes.
@@ -308,6 +327,7 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
os.close();
log.info("OutputStream for file {} is closed.", writeAt);
inputStream.close();
+ recordBytesWritten(writeAt);
}
}
}
@@ -457,7 +477,6 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
@Override
public void commit()
throws IOException {
-
if (!this.actualProcessedCopyableFile.isPresent()) {
return;
}
@@ -470,7 +489,6 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
log.info(String.format("Committing data from %s to %s", stagingFilePath,
outputFilePath));
try {
setFilePermissions(copyableFile);
-
Iterator<OwnerAndPermission> ancestorOwnerAndPermissionIt =
copyableFile.getAncestorsOwnerAndPermission() == null ?
Collections.emptyIterator()
: copyableFile.getAncestorsOwnerAndPermission().listIterator();
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriter.java
new file mode 100644
index 0000000000..b2ffe69830
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer.test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *
+ * The writer actually writes the correct data to the destination, but reports
incorrect sizes in the bytesWritten() method.
+ * The size discrepancy can be configured through properties:
+ * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by
(default 1.0)
+ * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size
(default 0)
+ */
+@Slf4j
+public class IncorrectSizeFileAwareInputStreamDataWriter extends
FileAwareInputStreamDataWriter {
+
+ public static final String INCORRECT_SIZE_RATIO_KEY =
CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio";
+ public static final String INCORRECT_SIZE_OFFSET_KEY =
CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset";
+ public static final double DEFAULT_INCORRECT_SIZE_RATIO = 0.9; // Default to
90% of actual size
+ public static final long DEFAULT_INCORRECT_SIZE_OFFSET = 0L;
+
+ private final double sizeRatio;
+ private final long sizeOffset;
+
+ public IncorrectSizeFileAwareInputStreamDataWriter(State state, int
numBranches, int branchId)
+ throws IOException {
+ this(state, numBranches, branchId, null);
+ }
+
+ public IncorrectSizeFileAwareInputStreamDataWriter(State state, int
numBranches, int branchId, String writerAttemptId)
+ throws IOException {
+ super(state, numBranches, branchId, writerAttemptId);
+ this.sizeRatio = state.getPropAsDouble(INCORRECT_SIZE_RATIO_KEY,
DEFAULT_INCORRECT_SIZE_RATIO);
+ if (this.sizeRatio <= 0 || this.sizeRatio > 1) {
+ throw new IllegalArgumentException("Incorrect size ratio must be in the
range (0, 1]: " + this.sizeRatio);
+ }
+ this.sizeOffset = state.getPropAsLong(INCORRECT_SIZE_OFFSET_KEY,
DEFAULT_INCORRECT_SIZE_OFFSET);
+ log.info("Initialized IncorrectSizeFileAwareInputStreamDataWriter with
ratio={}, offset={}",
+ this.sizeRatio, this.sizeOffset);
+ }
+
+ @Override
+ protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile
copyableFile,
+ FileAwareInputStream record) throws IOException {
+ // First call parent's writeImpl to do the actual writing
+ super.writeImpl(inputStream, writeAt, copyableFile, record);
+
+ // After writing is complete, update the state with actual and incorrect
sizes
+ long actualDestSize = this.fs.getFileStatus(writeAt).getLen();
+ long incorrectBytes = (long)(actualDestSize * this.sizeRatio) +
this.sizeOffset;
+
+ this.state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, incorrectBytes);
+
+ log.info("File size reporting: actual={}, reported={} (ratio={},
offset={})",
+ actualDestSize, incorrectBytes, this.sizeRatio, this.sizeOffset);
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java
new file mode 100644
index 0000000000..98d8da6680
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/test/IncorrectSizeFileAwareInputStreamDataWriterBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer.test;
+
+import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+
+/**
+ * Builder for {@link IncorrectSizeFileAwareInputStreamDataWriter}.
+ */
+public class IncorrectSizeFileAwareInputStreamDataWriterBuilder extends
DataWriterBuilder<String, FileAwareInputStream> {
+
+ @Override
+ public final DataWriter<FileAwareInputStream> build() throws IOException {
+ setJobSpecificOutputPaths(this.destination.getProperties());
+
this.destination.getProperties().setProp(ConfigurationKeys.WRITER_FILE_PATH,
this.writerId);
+ return buildWriter();
+ }
+
+ protected DataWriter<FileAwareInputStream> buildWriter() throws IOException {
+ return new
IncorrectSizeFileAwareInputStreamDataWriter(this.destination.getProperties(),
this.branches, this.branch, this.writerAttemptId);
+ }
+
+ /**
+ * Each job gets its own task-staging and task-output directory. Update the
staging and output directories to
+ * contain job_id. This is to make sure uncleaned data from previous
execution does not corrupt final published data
+ * produced by this execution.
+ */
+ public synchronized static void setJobSpecificOutputPaths(State state) {
+ if
(!StringUtils.containsIgnoreCase(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
+ state.getProp(ConfigurationKeys.JOB_ID_KEY))) {
+
+ state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
+ state.getProp(ConfigurationKeys.JOB_ID_KEY)));
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+ state.getProp(ConfigurationKeys.JOB_ID_KEY)));
+ }
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index dc79b8aaed..e53d1eae2f 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.gobblin.policies.size.FileSizePolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -632,6 +633,47 @@ public class FileAwareInputStreamDataWriterTest {
streamString1);
}
+ @Test
+ public void testStateUpdatesAfterWrite() throws Exception {
+ // Create test data
+ String testContent = "testContentsForStateUpdate";
+ byte[] contentBytes = testContent.getBytes(StandardCharsets.UTF_8);
+ long expectedSourceSize = contentBytes.length;
+
+ FileStatus status = fs.getFileStatus(testTempPath);
+ OwnerAndPermission ownerAndPermission =
+ new OwnerAndPermission(status.getOwner(), status.getGroup(), new
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ CopyableFile cf =
CopyableFileUtils.getTestCopyableFile(expectedSourceSize, ownerAndPermission);
+ CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new
TestCopyableDataset(new Path("/source")));
+
+ WorkUnitState state = TestUtils.createTestWorkUnitState();
+ state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath,
"staging").toString());
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath,
"output").toString());
+ state.setProp(ConfigurationKeys.WRITER_FILE_PATH,
RandomStringUtils.randomAlphabetic(5));
+ CopySource.serializeCopyEntity(state, cf);
+ CopySource.serializeCopyableDataset(state, metadata);
+
+ FileAwareInputStreamDataWriter dataWriter = new
FileAwareInputStreamDataWriter(state, 1, 0);
+ FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
+ .file(cf)
+ .inputStream(new ByteArrayInputStream(contentBytes))
+ .build();
+
+ // Write data and verify state updates
+ dataWriter.write(fileAwareInputStream);
+
+ // Verify source file size in state
+ String sourceSizeKey = FileSizePolicy.BYTES_READ_KEY;
+ Assert.assertEquals(state.getPropAsLong(sourceSizeKey), expectedSourceSize,
+ "Source file size should be recorded in state");
+
+ // Verify bytes written in state
+ String destinationSizeKey = FileSizePolicy.BYTES_WRITTEN_KEY;
+ Assert.assertEquals(state.getPropAsLong(destinationSizeKey),
expectedSourceSize,
+ "Bytes written should match source file size");
+
+ }
+
@AfterClass
public void cleanup() {
try {
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 154facfe55..879410c05d 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -83,6 +83,9 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String FLOW_GROUP_FIELD = "flowGroup";
public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId";
public static final String FLOW_EDGE_FIELD = "flowEdge";
+ public static final String FLOW_FABRIC = "fabric";
+ public static final String FLOW_SOURCE = "source";
+ public static final String FLOW_DESTINATION = "destination";
public static final String FLOW_MODIFICATION_TIME_FIELD =
"flowModificationTime";
public static final String JOB_NAME_FIELD = "jobName";
public static final String JOB_GROUP_FIELD = "jobGroup";
@@ -115,6 +118,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String JOB_LAST_PROGRESS_EVENT_TIME =
"jobLastProgressEventTime";
public static final String JOB_COMPLETION_PERCENTAGE =
"jobCompletionPercentage";
public static final String DATASET_TASK_SUMMARIES = "datasetTaskSummaries";
+ public static final String DATASET_URN = "datasetUrn";
@Getter
private Long startTime;
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 021ce11cf0..57e95508b6 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -22,6 +22,12 @@ public class ServiceMetricNames {
public static final String GOBBLIN_SERVICE_PREFIX = "GobblinService";
public static final String GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER =
GOBBLIN_SERVICE_PREFIX + ".";
public static final String GOBBLIN_JOB_METRICS_PREFIX = "JobMetrics";
+ public static final String DATA_QUALITY_JOB_SUCCESS_COUNT =
"dataQualitySuccessCount";
+ public static final String DATA_QUALITY_JOB_FAILURE_COUNT =
"dataQualityFailureCount";
+ public static final String DATA_QUALITY_OVERALL_FILE_COUNT =
"dataQualityOverallFileCount";
+ public static final String DATA_QUALITY_SUCCESS_FILE_COUNT =
"dataQualitySuccessFileCount";
+ public static final String DATA_QUALITY_FAILURE_FILE_COUNT =
"dataQualityFailureFileCount";
+ public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT =
"dataQualityNonEvaluatedFileCount";
// Flow Compilation Meters and Timer
public static final String FLOW_COMPILATION_SUCCESSFUL_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
new file mode 100644
index 0000000000..959f57053c
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+
+import java.util.List;
+import java.util.Properties;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {
+ }
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ @AllArgsConstructor
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ // Total number of files evaluated for data quality
+ private final int totalFiles;
+ // Number of files that passed data quality checks
+ private final int passedFiles;
+ // Number of files that failed data quality checks
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for example
files not found or not processed
+ private final int nonEvaluatedFiles;
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState,
+ JobState jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(),
result.getPassedFiles(),
+ result.getFailedFiles(), result.getNonEvaluatedFiles(),
datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesCount = 0;
+ int passedFilesCount = 0;
+ int nonEvaluatedFilesCount = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesCount++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " + taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesCount++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality) {
+ failedFilesCount++;
+ jobDataQualityStatus = DataQualityStatus.FAILED;
+ } else {
+ log.warn("Unexpected data quality status: " + taskDataQuality + "
for task: " + taskState.getTaskId());
+ }
+ } else {
+ // Handle files without data quality evaluation
+ nonEvaluatedFilesCount++;
+ log.warn("No data quality evaluation for task: " +
taskState.getTaskId());
+ }
+ }
+
+ // Log summary of evaluation
+ log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed:
{}, Not Evaluated: {}", totalFiles,
+ passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+ return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles,
passedFilesCount, failedFilesCount,
+ nonEvaluatedFilesCount);
+ }
+
+ private static void emitMetrics(JobState jobState, final DataQualityStatus
jobDataQuality, final int totalFiles,
+ final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount,
+ final String datasetUrn) {
+ try {
+ // Check if OpenTelemetry is enabled
+ boolean otelEnabled =
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+ ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+ if (!otelEnabled) {
+ log.info("OpenTelemetry metrics disabled, skipping metrics emission");
+ return;
+ }
+
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ if (otelMetrics == null) {
+ log.warn("OpenTelemetry metrics instance is null, skipping metrics
emission");
+ return;
+ }
+ log.info("OpenTelemetry instance obtained");
+
+ Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+ Attributes tags = getTagsForDataQualityMetrics(jobState, datasetUrn);
+ log.info("Emitting DQ metrics for job={}, status={}, tags={}",
jobState.getJobName(), jobDataQuality, tags);
+ String jobMetricDescription = "Number of Jobs with data quality status:
" + jobDataQuality;
+ String jobMetricName =
+ (jobDataQuality == DataQualityStatus.PASSED) ?
ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT
+ : ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT;
+ log.info("Data quality status for job: {} is {}", jobState.getJobName(),
jobDataQuality);
+
meter.counterBuilder(jobMetricName).setDescription(jobMetricDescription).build().add(1,
tags);
+
+ // Emit overall files count
+ meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+ .setDescription("Number of files evaluated for data
quality").build().add(totalFiles, tags);
+
+ // Emit passed files count
+ meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+ .setDescription("Number of files that passed data
quality").build().add(passedFilesCount, tags);
+
+ // Emit failed files count
+ meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+ .setDescription("Number of files that failed data
quality").build().add(failedFilesCount, tags);
+
+ // Emit non-evaluated files count
+
meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_NON_EVALUATED_FILE_COUNT)
+ .setDescription("Number of files that did not have data quality
evaluation").build()
+ .add(nonEvaluatedFilesCount, tags);
+ } catch (Exception e) {
+ log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
+ }
+ log.info("Completed emitMetrics for job: {}", jobState.getJobName());
+ }
+
+ private static Attributes getTagsForDataQualityMetrics(JobState jobState,
String datasetUrn) {
+ Properties jobProperties = jobState.getProperties();
+ log.info("Job properties loaded: " + jobProperties);
+
+ return
Attributes.builder().put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobState.getJobName())
+ .put(TimingEvent.DATASET_URN, datasetUrn)
+ .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY))
+ .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ jobProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY))
+ .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ jobProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
+ .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
+ jobProperties.getProperty(ConfigurationKeys.FLOW_EDGE_ID_KEY))
+ .put(TimingEvent.FlowEventConstants.FLOW_FABRIC,
+
jobProperties.getProperty(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_FABRIC,
null))
+ .put(TimingEvent.FlowEventConstants.FLOW_SOURCE,
+
jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))
+ .put(TimingEvent.FlowEventConstants.FLOW_DESTINATION,
+
jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
""))
+ .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
+ jobProperties.getProperty(ConfigurationKeys.FLOW_SPEC_EXECUTOR,
"")).build();
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index ca6b80bd2f..7acda0884d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -752,12 +752,12 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
LOG.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes:
%d)", datasetState.getDatasetUrn(), totalRecordsWritten, totalBytesWritten));
datasetTaskSummaries.add(new
DatasetTaskSummary(datasetState.getDatasetUrn(), totalRecordsWritten,
totalBytesWritten,
- datasetState.getState() == JobState.RunningState.COMMITTED));
+ datasetState.getState() == JobState.RunningState.COMMITTED,
jobState.getDataQualityStatus().name()));
} else if (datasetState.getState() == JobState.RunningState.FAILED) {
// Check if config is turned on for submitting writer metrics on
failure due to non-atomic write semantics
if (this.jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
LOG.info("Due to task failure, will report that no records or bytes
were written for " + datasetState.getDatasetUrn());
- datasetTaskSummaries.add(new
DatasetTaskSummary(datasetState.getDatasetUrn(), 0, 0, false));
+ datasetTaskSummaries.add(new
DatasetTaskSummary(datasetState.getDatasetUrn(), 0, 0, false,
jobState.getDataQualityStatus().name()));
}
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 1aebbc2f86..f8c1d2a1e0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -24,9 +24,8 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
+import lombok.ToString;
import org.apache.gobblin.metrics.DatasetMetric;
-import org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
-
/**
* A class returned by {@link org.apache.gobblin.runtime.SafeDatasetCommit} to
provide metrics for the dataset
@@ -35,18 +34,19 @@ import
org.apache.gobblin.qualitychecker.task.TaskLevelPolicyChecker;
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@RequiredArgsConstructor
-@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@NoArgsConstructor
+@ToString
public class DatasetTaskSummary {
@NonNull private String datasetUrn;
@NonNull private long recordsWritten;
@NonNull private long bytesWritten;
@NonNull private boolean successfullyCommitted;
+ @NonNull private String dataQualityStatus;
/**
* Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
*/
public static DatasetMetric toDatasetMetric(DatasetTaskSummary
datasetTaskSummary) {
- return new DatasetMetric(datasetTaskSummary.getDatasetUrn(),
datasetTaskSummary.getBytesWritten(), datasetTaskSummary.getRecordsWritten(),
datasetTaskSummary.isSuccessfullyCommitted(),
- TaskLevelPolicyChecker.DataQualityStatus.NOT_EVALUATED.name());
+ return new DatasetMetric(datasetTaskSummary.getDatasetUrn(),
datasetTaskSummary.getBytesWritten(), datasetTaskSummary.getRecordsWritten(),
datasetTaskSummary.isSuccessfullyCommitted(),
datasetTaskSummary.getDataQualityStatus());
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
index cf53d560ac..911cb0aec7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java
@@ -26,6 +26,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
/**
@@ -48,9 +49,12 @@ public class ForkThrowableHolder {
return throwables.isEmpty();
}
- public ForkException getAggregatedException (List<Integer> failedForkIds,
String taskId) {
+ public ForkException getAggregatedException (List<Integer> failedForkIds,
String taskId, DataQualityStatus taskDataQuality) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("Fork branches " + failedForkIds + " failed for task "
+ taskId + "\n");
+ if(DataQualityStatus.FAILED == taskDataQuality){
+ stringBuffer.append("DataQuality failed for task: " + taskId + "\n");
+ }
for (Integer idx: failedForkIds) {
stringBuffer.append("<Fork " + idx + ">\n");
if (this.throwables.containsKey(idx)) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 3f6dca4a92..e591ad6859 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -516,7 +516,7 @@ public class JobContext implements Closeable {
DeliverySemantics deliverySemantics, String datasetUrn,
JobState.DatasetState datasetState,
boolean isMultithreaded, JobContext jobContext) {
return new SafeDatasetCommit(shouldCommitDataInJob, isJobCancelled,
deliverySemantics, datasetUrn, datasetState,
- isMultithreaded, jobContext);
+ isMultithreaded, jobContext, SafeDatasetCommit.COMMIT_SRC_JOB_CONTEXT);
}
protected Map<String, JobState.DatasetState> computeDatasetStatesByUrns() {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index d61777f833..8d1bc890da 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -30,25 +30,15 @@ import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.job.JobProgress;
+import org.apache.gobblin.runtime.util.MetricGroup;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.io.Text;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.google.common.base.Enums;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.stream.JsonWriter;
-import com.linkedin.data.template.StringMap;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
@@ -63,13 +53,24 @@ import org.apache.gobblin.rest.MetricTypeEnum;
import org.apache.gobblin.rest.TaskExecutionInfoArray;
import org.apache.gobblin.runtime.api.MonitoredObject;
import org.apache.gobblin.runtime.util.JobMetrics;
-import org.apache.gobblin.runtime.util.MetricGroup;
-import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.google.common.base.Enums;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.data.template.StringMap;
+import org.apache.hadoop.io.Text;
import org.apache.gobblin.util.ImmutableProperties;
import org.apache.gobblin.util.JobLauncherUtils;
-
/**
* A class for tracking job state information.
*
@@ -94,6 +95,8 @@ public class JobState extends SourceState implements
JobProgress {
* <li> SUCCESSFUL => CANCELLED (cancelled before committing)
* </ul>
*/
+ public static final String GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX =
"GaaSJobObservabilityEventProducer.";
+ public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
public enum RunningState implements MonitoredObject {
/** Pending creation of {@link WorkUnit}s. */
PENDING,
@@ -588,6 +591,15 @@ public class JobState extends SourceState implements
JobProgress {
.name("completed tasks").value(this.getCompletedTasks());
}
+ /**
+ * Gets the overall data quality status of the job.
+ * @return "PASSED" if all tasks passed their quality checks, "FAILED"
otherwise
+ */
+ public DataQualityStatus getDataQualityStatus() {
+ return
DataQualityStatus.fromString(super.getProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY));
+ }
+
+
protected void propsToJson(JsonWriter jsonWriter)
throws IOException {
jsonWriter.beginObject();
@@ -756,6 +768,7 @@ public class JobState extends SourceState implements
JobProgress {
* and {@link #setProp(String, Object)} are not supported.
* </p>
*/
+ @Slf4j
public static class DatasetState extends JobState {
// For serialization/deserialization
@@ -794,6 +807,7 @@ public class JobState extends SourceState implements
JobProgress {
jsonWriter.beginObject();
jsonWriter.name(ConfigurationKeys.DATASET_URN_KEY).value(getDatasetUrn());
jsonWriter.name(ConfigurationKeys.JOB_FAILURES_KEY).value(getJobFailures());
+
jsonWriter.name(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY).value(getDataQualityStatus().name());
jsonWriter.endObject();
}
@@ -831,7 +845,10 @@ public class JobState extends SourceState implements
JobProgress {
protected void writeStateSummary(JsonWriter jsonWriter)
throws IOException {
super.writeStateSummary(jsonWriter);
+
jsonWriter.name(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY).value(getDataQualityStatus().name());
jsonWriter.name("datasetUrn").value(getDatasetUrn());
}
+
+
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 70a9d9a96e..de355e47d0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -54,6 +54,7 @@ import
org.apache.gobblin.runtime.commit.DatasetStateCommitStep;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.quality.DataQualityEvaluator;
/**
@@ -69,6 +70,8 @@ public final class SafeDatasetCommit implements
Callable<Void> {
private static final String DATASET_STATE = "datasetState";
private static final String FAILED_DATASET_EVENT = "failedDataset";
+ public static final String COMMIT_SRC_JOB_CONTEXT = "JobContext";
+ public static final String COMMIT_SRC_COMMIT_ACTIVITY_IMPL =
"CommitActivityImpl";
private final boolean shouldCommitDataInJob;
private final boolean isJobCancelled;
@@ -77,6 +80,7 @@ public final class SafeDatasetCommit implements
Callable<Void> {
private final JobState.DatasetState datasetState;
private final boolean isMultithreaded;
private final JobContext jobContext;
+ private final String datasetCommitSrc;
private MetricContext metricContext;
@@ -90,6 +94,14 @@ public final class SafeDatasetCommit implements
Callable<Void> {
metricContext = Instrumented.getMetricContext(datasetState,
SafeDatasetCommit.class);
finalizeDatasetStateBeforeCommit(this.datasetState);
+ // evaluate data quality at the dataset commit level, only when commit
source is CommitActivityImpl
+ if
(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc))
{
+ log.info("Evaluating data quality for commit activity for dataset {}.",
this.datasetUrn);
+ evaluateAndEmitDatasetQuality();
+ } else {
+ log.info("Skipping data quality evaluation for dataset {} as commit
source is {}", this.datasetUrn,
+ this.datasetCommitSrc);
+ }
Class<? extends DataPublisher> dataPublisherClass;
try (Closer closer = Closer.create()) {
dataPublisherClass =
JobContext.getJobDataPublisherClass(this.jobContext.getJobState())
@@ -438,4 +450,14 @@ public final class SafeDatasetCommit implements
Callable<Void> {
.withDatasetState(datasetState).build());
}
+ /**
+ * Evaluates and stores the data quality status for the dataset.
+ * This method handles the business logic of data quality evaluation
+ * at the dataset commit level, which is more appropriate than having
+ * it in the JobState data container.
+ */
+ private void evaluateAndEmitDatasetQuality() {
+ DataQualityEvaluator.evaluateAndReportDatasetQuality(this.datasetState,
this.jobContext.getJobState());
+ }
+
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 7ddb4f11f2..2dd2e83f5d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -32,6 +32,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
+import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -96,6 +97,7 @@ import org.apache.gobblin.writer.TrackerBasedWatermarkManager;
import org.apache.gobblin.writer.WatermarkAwareWriter;
import org.apache.gobblin.writer.WatermarkManager;
import org.apache.gobblin.writer.WatermarkStorage;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
/**
@@ -316,6 +318,32 @@ public class Task implements TaskIFace {
this.shutdownLatch.countDown();
}
+ private void computeAndUpdateTaskDataQuality() {
+ DataQualityStatus overallTaskDataQuality = DataQualityStatus.PASSED;
+ for (Optional<Fork> fork : this.forks.keySet()) {
+ if (fork.isPresent()) {
+ TaskState forkTaskState = fork.get().getTaskState();
+ if (forkTaskState != null) {
+ String result =
forkTaskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ DataQualityStatus forkDataQualityStatus = null;
+ try {
+ if (result != null) {
+ forkDataQualityStatus = DataQualityStatus.valueOf(result);
+ }
+ } catch (IllegalArgumentException e) {
+ Log.warn("Unknown data quality status encountered: " + result);
+ forkDataQualityStatus = DataQualityStatus.UNKNOWN;
+ }
+ if (DataQualityStatus.FAILED == forkDataQualityStatus) {
+ overallTaskDataQuality = DataQualityStatus.FAILED;
+ }
+ }
+ }
+ }
+ LOG.info("Data quality state of the task is {}", overallTaskDataQuality);
+ this.taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
overallTaskDataQuality.name());
+ }
+
private boolean shutdownRequested() {
if (!this.shutdownRequested.get()) {
this.shutdownRequested.set(Thread.currentThread().isInterrupted());
@@ -385,7 +413,7 @@ public class Task implements TaskIFace {
if (failedForksId.size() == 1 &&
holder.getThrowable(failedForksId.get(0)).isPresent()) {
e = holder.getThrowable(failedForksId.get(0)).get();
}else{
- e = holder.getAggregatedException(failedForksId, this.taskId);
+ e = holder.getAggregatedException(failedForksId, this.taskId,
null);
}
}
throw e == null ? new RuntimeException("Some forks failed") : e;
@@ -927,7 +955,7 @@ public class Task implements TaskIFace {
}
}
}
-
+ this.computeAndUpdateTaskDataQuality();
if (failedForkIds.size() == 0) {
// Set the task state to SUCCESSFUL. The state is not set to COMMITTED
// as the data publisher will do that upon successful data publishing.
@@ -942,7 +970,14 @@ public class Task implements TaskIFace {
if (failedForkIds.size() == 1 &&
holder.getThrowable(failedForkIds.get(0)).isPresent()) {
failTask(holder.getThrowable(failedForkIds.get(0)).get());
} else {
- failTask(holder.getAggregatedException(failedForkIds,
this.taskId));
+ String taskDataQualityString =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ DataQualityStatus dataQualityStatus =
DataQualityStatus.NOT_EVALUATED;
+ if (taskDataQualityString != null) {
+ dataQualityStatus =
DataQualityStatus.valueOf(taskDataQualityString);
+ } else {
+ Log.warn("Task data quality status is not set, defaulting to
NOT_EVALUATED for taskId: {}", this.taskId);
+ }
+ failTask(holder.getAggregatedException(failedForkIds, this.taskId,
dataQualityStatus));
}
} else {
// just in case there are some corner cases where Fork throw an
exception but doesn't add into holder
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index ed83c0d538..0ff7cde453 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -21,6 +21,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -352,6 +354,20 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
return parentTaskDone;
}
+
+ /**
+ * Get the {@link TaskState} associated with this {@link Fork}.
+ *
+ * <p>
+ * The {@link TaskState} contains information about the execution state of
the Fork {@link Fork},
+ * including metrics and other runtime data. This method allows access to
the {@link TaskState}
+ * for monitoring or reporting purposes.
+ * </p>
+ *
+ * @return the {@link TaskState} of the {@link Fork}.
+ */
+ public TaskState getTaskState() { return this.forkTaskState; }
+
/**
* Update record-level metrics.
*/
@@ -377,7 +393,8 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
*/
public boolean commit() {
try {
- if (checkDataQuality(this.convertedSchema)) {
+ boolean dataQualityEnforced =
this.forkTaskState.getPropAsBoolean(ConfigurationKeys.ENFORCE_DATA_QUALITY_FAILURE_KEY,
ConfigurationKeys.DEFAULT_ENFORCE_DATA_QUALITY_FAILURE);
+ if (checkDataQuality(this.convertedSchema) || !dataQualityEnforced) {
// Commit data if all quality checkers pass. Again, not to catch the
exception
// it may throw so the exception gets propagated to the caller of this
method.
this.logger.info(String.format("Committing data for fork %d of task
%s", this.index, this.taskId));
@@ -613,6 +630,11 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
TaskLevelPolicyCheckResults taskResults =
this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState,
this.branches > 1 ? this.index : -1)
.executePolicies();
+ boolean hasFailureForMandatoryPolicy = taskResults.getPolicyResults()
+ .getOrDefault(TaskLevelPolicy.Result.FAILED,
java.util.Collections.emptySet())
+ .contains(TaskLevelPolicy.Type.FAIL);
+ forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
+ hasFailureForMandatoryPolicy ? DataQualityStatus.FAILED.name() :
DataQualityStatus.PASSED.name());
TaskPublisher publisher =
this.taskContext.getTaskPublisher(this.forkTaskState, taskResults);
switch (publisher.canPublish()) {
case SUCCESS:
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
index b2e3e8b09b..4a6ebee5e9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
@@ -89,6 +89,7 @@ public class LocalTaskStateTracker extends
AbstractTaskStateTracker {
try {
// Check the task state and handle task retry if task failed and
// it has not reached the maximum number of retries
+
WorkUnitState.WorkingState state = task.getTaskState().getWorkingState();
if (state == WorkUnitState.WorkingState.FAILED && task.getRetryCount() <
this.maxTaskRetries) {
this.taskExecutor.retry(task);
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/quality/DataQualityEvaluatorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/quality/DataQualityEvaluatorTest.java
new file mode 100644
index 0000000000..32bf9658c7
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/quality/DataQualityEvaluatorTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Unit tests for {@link DataQualityEvaluator}
+ */
+@Test(groups = {"gobblin.quality"})
+public class DataQualityEvaluatorTest {
+
+ @Test
+ public void testDataQualityEvaluationWithAllPassedTasks() {
+ // Create job state with properties
+ JobState jobState = new JobState("TestJob", "TestJob-1");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
"TestFlow");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
"TestGroup");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
"12345");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, "edge1");
+ jobState.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME,
"test-instance");
+ jobState.setProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
"test-executor");
+ jobState.setProp(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY,
"test-source");
+ jobState.setProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
"test-destination");
+
+ // Create task states with PASSED quality status
+ List<TaskState> taskStates = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ TaskState taskState = new TaskState();
+ taskState.setTaskId("Task-" + i);
+ taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.PASSED.name());
+ taskStates.add(taskState);
+ }
+
+ // Evaluate data quality
+ DataQualityEvaluator.DataQualityEvaluationResult result =
+ DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+ // Verify results
+ Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED);
+ Assert.assertEquals(result.getTotalFiles(), 3);
+ Assert.assertEquals(result.getPassedFiles(), 3);
+ Assert.assertEquals(result.getFailedFiles(), 0);
+ Assert.assertEquals(result.getNonEvaluatedFiles(), 0);
+ }
+
+ @Test
+ public void testDataQualityEvaluationWithMixedResults() {
+ // Create job state
+ JobState jobState = new JobState("TestJob", "TestJob-1");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
"TestFlow");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
"TestGroup");
+
+ // Create task states with mixed quality status
+ List<TaskState> taskStates = new ArrayList<>();
+
+ // Passed task
+ TaskState passedTask = new TaskState();
+ passedTask.setTaskId("Task-1");
+ passedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.PASSED.name());
+ taskStates.add(passedTask);
+
+ // Failed task
+ TaskState failedTask = new TaskState();
+ failedTask.setTaskId("Task-2");
+ failedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.FAILED.name());
+ taskStates.add(failedTask);
+
+ // Not evaluated task
+ TaskState notEvaluatedTask = new TaskState();
+ notEvaluatedTask.setTaskId("Task-3");
+ notEvaluatedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.NOT_EVALUATED.name());
+ taskStates.add(notEvaluatedTask);
+
+ // Evaluate data quality
+ DataQualityEvaluator.DataQualityEvaluationResult result =
+ DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+ // Verify results
+ Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.FAILED);
+ Assert.assertEquals(result.getTotalFiles(), 3);
+ Assert.assertEquals(result.getPassedFiles(), 1);
+ Assert.assertEquals(result.getFailedFiles(), 1);
+ Assert.assertEquals(result.getNonEvaluatedFiles(), 1);
+ }
+
+ @Test
+ public void testJobPropertiesRetrieval() {
+ // Create job state with various properties
+ JobState jobState = new JobState("TestJob", "TestJob-1");
+ jobState.setProp("test.property1", "value1");
+ jobState.setProp("test.property2", "value2");
+ jobState.setProp(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY,
"test-source");
+ jobState.setProp(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
"test-destination");
+
+ // Get properties directly from JobState
+ Properties jobProperties = jobState.getProperties();
+
+ // Verify properties are correctly retrieved
+ Assert.assertEquals(jobProperties.getProperty("test.property1"), "value1");
+ Assert.assertEquals(jobProperties.getProperty("test.property2"), "value2");
+
Assert.assertEquals(jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY),
"test-source");
+
Assert.assertEquals(jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY),
+ "test-destination");
+ }
+
+ @Test
+ public void testDatasetQualityEvaluation() {
+ // Create job state
+ JobState jobState = new JobState("TestJob", "TestJob-1");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
"TestFlow");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
"TestGroup");
+
+ // Create dataset state
+ JobState.DatasetState datasetState = new JobState.DatasetState("TestJob",
"TestJob-1");
+ datasetState.setDatasetUrn("test://dataset/urn");
+
+ // Create task states
+ List<TaskState> taskStates = new ArrayList<>();
+ TaskState taskState = new TaskState();
+ taskState.setTaskId("Task-1");
+ taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.PASSED.name());
+ taskStates.add(taskState);
+
+ // Add task states to dataset state
+ for (TaskState ts : taskStates) {
+ datasetState.addTaskState(ts);
+ }
+ // Evaluate dataset quality
+ DataQualityEvaluator.DataQualityEvaluationResult result =
+ DataQualityEvaluator.evaluateAndReportDatasetQuality(datasetState,
jobState);
+
+ // Verify results
+ Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED);
+ Assert.assertEquals(result.getTotalFiles(), 1);
+ Assert.assertEquals(result.getPassedFiles(), 1);
+ Assert.assertEquals(result.getFailedFiles(), 0);
+ Assert.assertEquals(result.getNonEvaluatedFiles(), 0);
+
+ // Verify dataset state was updated
+ //Assert.assertEquals(datasetState.getDataQualityStatus(),
DataQualityStatus.PASSED);
+ }
+
+ @Test
+ public void testDataQualityEvaluationWithNullTaskStates() {
+ // Create job state
+ JobState jobState = new JobState("TestJob", "TestJob-1");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
"TestFlow");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
"TestGroup");
+
+ // Create task states with some null entries
+ List<TaskState> taskStates = new ArrayList<>();
+
+ // Valid passed task
+ TaskState passedTask = new TaskState();
+ passedTask.setTaskId("Task-1");
+ passedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.PASSED.name());
+ taskStates.add(passedTask);
+
+ // Null task state
+ taskStates.add(null);
+
+ // Valid failed task
+ TaskState failedTask = new TaskState();
+ failedTask.setTaskId("Task-3");
+ failedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.FAILED.name());
+ taskStates.add(failedTask);
+
+ // Another null task state
+ taskStates.add(null);
+
+ // Valid not evaluated task
+ TaskState notEvaluatedTask = new TaskState();
+ notEvaluatedTask.setTaskId("Task-5");
+ notEvaluatedTask.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
DataQualityStatus.NOT_EVALUATED.name());
+ taskStates.add(notEvaluatedTask);
+
+ // Evaluate data quality
+ DataQualityEvaluator.DataQualityEvaluationResult result =
+ DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+ // Verify results - should handle null task states gracefully
+ // The method should process only non-null task states
+ Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.FAILED);
+ Assert.assertEquals(result.getTotalFiles(), 5); // Total includes null
entries
+ Assert.assertEquals(result.getPassedFiles(), 1);
+ Assert.assertEquals(result.getFailedFiles(), 1);
+ Assert.assertEquals(result.getNonEvaluatedFiles(), 3);
+ }
+
+ @Test
+ public void testDataQualityEvaluationWithAllNullTaskStates() {
+ // Create job state
+ JobState jobState = new JobState("TestJob", "TestJob-1");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
"TestFlow");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
"TestGroup");
+
+ // Create list with all null task states
+ List<TaskState> taskStates = new ArrayList<>();
+ taskStates.add(null);
+ taskStates.add(null);
+ taskStates.add(null);
+
+ // Evaluate data quality
+ DataQualityEvaluator.DataQualityEvaluationResult result =
+ DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+ // Verify results - should handle all null task states gracefully
+ Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED);
// Default status when no failures
+ Assert.assertEquals(result.getTotalFiles(), 3);
+ Assert.assertEquals(result.getPassedFiles(), 0);
+ Assert.assertEquals(result.getFailedFiles(), 0);
+ Assert.assertEquals(result.getNonEvaluatedFiles(), 3);
+ }
+
+ @Test
+ public void testDataQualityEvaluationWithEmptyTaskStatesList() {
+ // Create job state
+ JobState jobState = new JobState("TestJob", "TestJob-1");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
"TestFlow");
+ jobState.setProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
"TestGroup");
+
+ // Create empty task states list
+ List<TaskState> taskStates = new ArrayList<>();
+
+ // Evaluate data quality
+ DataQualityEvaluator.DataQualityEvaluationResult result =
+ DataQualityEvaluator.evaluateDataQuality(taskStates, jobState);
+
+ // Verify results - should handle empty list gracefully
+ Assert.assertEquals(result.getQualityStatus(), DataQualityStatus.PASSED);
// Default status when no tasks
+ Assert.assertEquals(result.getTotalFiles(), 0);
+ Assert.assertEquals(result.getPassedFiles(), 0);
+ Assert.assertEquals(result.getFailedFiles(), 0);
+ Assert.assertEquals(result.getNonEvaluatedFiles(), 0);
+ }
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java
index 8bcca84b27..b4dcd1960e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobStateTest.java
@@ -194,7 +194,8 @@ public class JobStateTest {
Assert.assertEquals(jobExecutionInfo.getJobProperties().get("foo"), "bar");
List<String> taskStateIds = Lists.newArrayList();
- for (TaskExecutionInfo taskExecutionInfo :
jobExecutionInfo.getTaskExecutions()) {
+ for (int i = 0; i < jobExecutionInfo.getTaskExecutions().size(); i++) {
+ TaskExecutionInfo taskExecutionInfo =
jobExecutionInfo.getTaskExecutions().get(i);
Assert.assertEquals(taskExecutionInfo.getJobId(), "TestJob-1");
Assert.assertEquals(taskExecutionInfo.getStartTime().longValue(),
this.startTime);
Assert.assertEquals(taskExecutionInfo.getEndTime().longValue(),
this.startTime + 1000);
@@ -207,4 +208,5 @@ public class JobStateTest {
Collections.sort(taskStateIds);
Assert.assertEquals(taskStateIds, Lists.newArrayList("TestTask-0",
"TestTask-1", "TestTask-2"));
}
+
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
index 2cbde03039..366a1408f7 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityProducerTest.java
@@ -76,8 +76,8 @@ public class GaaSJobObservabilityProducerTest {
createTestIssue("issueSummary", "issueCode", IssueSeverity.INFO)
);
List<DatasetTaskSummary> summaries = new ArrayList<>();
- DatasetTaskSummary dataset1 = new DatasetTaskSummary("/testFolder", 100,
1000, true);
- DatasetTaskSummary dataset2 = new DatasetTaskSummary("/testFolder2", 1000,
10000, false);
+ DatasetTaskSummary dataset1 = new DatasetTaskSummary("/testFolder", 100,
1000, true, "PASSED");
+ DatasetTaskSummary dataset2 = new DatasetTaskSummary("/testFolder2", 1000,
10000, false, "PASSED");
summaries.add(dataset1);
summaries.add(dataset2);
Properties jobProps = new Properties();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index b106cdc153..ba44dc6198 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -145,7 +145,7 @@ public class CommitActivityImpl implements CommitActivity {
boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
- Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
+ Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext,
shouldIncludeFailedTasks);
return new CommitStats(
datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
@@ -197,7 +197,7 @@ public class CommitActivityImpl implements CommitActivity {
@Override
public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
- entry.getValue(), false, jobContext);
+ entry.getValue(), false, jobContext,
SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL);
}
}).iterator(), numCommitThreads,
// TODO: Rewrite executorUtils to use java util optional
@@ -249,8 +249,10 @@ public class CommitActivityImpl implements CommitActivity {
});
}
- private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String,
JobState.DatasetState> datasetStatesByUrns, JobCommitPolicy commitPolicy,
boolean shouldIncludeFailedTasks) {
+ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String,
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext, boolean
shouldIncludeFailedTasks) {
Map<String, DatasetStats> datasetTaskStats = new HashMap<>();
+ JobCommitPolicy commitPolicy = jobContext.getJobCommitPolicy();
+ JobState jobState = jobContext.getJobState();
// Only process successful datasets unless configuration to process failed
datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED ||
(datasetState.getState() == JobState.RunningState.FAILED
@@ -270,11 +272,11 @@ public class CommitActivityImpl implements CommitActivity
{
}
log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes:
%d)", datasetState.getDatasetUrn(),
totalRecordsWritten, totalBytesWritten));
- datasetTaskStats.put(datasetState.getDatasetUrn(), new
DatasetStats(totalRecordsWritten, totalBytesWritten, true,
totalCommittedTasks));
+ datasetTaskStats.put(datasetState.getDatasetUrn(), new
DatasetStats(totalRecordsWritten, totalBytesWritten, true, totalCommittedTasks,
jobState.getDataQualityStatus().name()));
} else if (datasetState.getState() == JobState.RunningState.FAILED &&
commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
// Check if config is turned on for submitting writer metrics on
failure due to non-atomic write semantics
log.info("Due to task failure, will report that no records or bytes
were written for " + datasetState.getDatasetUrn());
- datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats(
0, 0, false, 0));
+ datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats(
0, 0, false, 0, jobState.getDataQualityStatus().name()));
}
}
return datasetTaskStats;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
index e6a0d9e417..f2a854fa7f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -37,4 +37,5 @@ public class DatasetStats {
@NonNull private long bytesWritten;
@NonNull private boolean successfullyCommitted;
@NonNull private int numCommittedWorkunits;
+ @NonNull private String dataQualityCheckStatus;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 5b5658b910..56f645fc49 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -61,8 +61,9 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
private List<DatasetTaskSummary>
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
- datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(),
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(),
entry.getValue().isSuccessfullyCommitted()));
+ datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(),
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(),
entry.getValue().isSuccessfullyCommitted(),
entry.getValue().getDataQualityCheckStatus()));
}
+ log.info("Converted dataset stats to task summaries: {}",
datasetTaskSummaries);
return datasetTaskSummaries;
}
}