This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f7fe367c40 DQ handling for no bytes read scenarios and metrics for
bytes read/written (#4135)
f7fe367c40 is described below
commit f7fe367c4089ae7b0895c442d5e6a18b872b5761
Author: vsinghal85 <[email protected]>
AuthorDate: Mon Aug 25 12:08:39 2025 +0530
DQ handling for no bytes read scenarios and metrics for bytes read/written
(#4135)
* introduce not evaluated flag for null bytes read case and bytes
read/written metrics
---------
Co-authored-by: Vaibhav Singhal <[email protected]>
---
.../qualitychecker/task/TaskLevelPolicy.java | 3 ++-
.../gobblin/policies/size/FileSizePolicy.java | 26 +++++++++++++-------
.../gobblin/policies/size/FileSizePolicyTest.java | 17 ++++++++++---
.../apache/gobblin/metrics/ServiceMetricNames.java | 2 ++
.../gobblin/quality/DataQualityEvaluator.java | 28 ++++++++++++++++++----
.../main/java/org/apache/gobblin/runtime/Task.java | 26 ++++++++++++++++++--
.../java/org/apache/gobblin/runtime/fork/Fork.java | 24 +++++++++++++++----
7 files changed, 102 insertions(+), 24 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
index b5043e01e8..de53f26485 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicy.java
@@ -32,7 +32,8 @@ public abstract class TaskLevelPolicy {
public enum Result {
PASSED, // The test passed
- FAILED // The test failed
+ FAILED, // The test failed
+ NOT_EVALUATED // The test was not evaluated
}
public TaskLevelPolicy(State state, TaskLevelPolicy.Type type) {
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
index d5f935d3bc..550f2299a3 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java
@@ -44,12 +44,12 @@ public class FileSizePolicy extends TaskLevelPolicy {
public Result executePolicy() {
TransferBytes transferBytes =
getBytesReadAndWritten(this.state).orElse(null);
if (transferBytes == null) {
- return Result.FAILED;
+ return Result.NOT_EVALUATED;
}
- Long bytesRead = transferBytes.getBytesRead();
- Long bytesWritten = transferBytes.getBytesWritten();
+ long bytesRead = transferBytes.getBytesRead();
+ long bytesWritten = transferBytes.getBytesWritten();
- Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+ long sizeDifference = Math.abs(bytesRead - bytesWritten);
if (sizeDifference == 0) {
return Result.PASSED;
@@ -67,7 +67,7 @@ public class FileSizePolicy extends TaskLevelPolicy {
return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]",
transferBytes.getBytesRead(),
transferBytes.getBytesWritten());
} else {
- return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+ return "Transfer bytes information not available";
}
}
@@ -87,18 +87,28 @@ public class FileSizePolicy extends TaskLevelPolicy {
/**
* Extracts bytesRead and bytesWritten from the given state.
+ * If bytesRead is null/zero, skip data quality check by returning
Optional.empty().
* Returns Empty Optional if parsing fails.
*/
private Optional<TransferBytes> getBytesReadAndWritten(State state) {
String bytesReadString = state.getProp(BYTES_READ_KEY);
String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
- if (bytesReadString == null || bytesWrittenString == null) {
- log.error("Missing value(s): bytesReadStr={}, bytesWrittenStr={}",
bytesReadString, bytesWrittenString);
+ if (bytesReadString == null) {
+ log.error("Missing value(s): bytesReadStr=null, bytesWrittenStr={}",
bytesWrittenString);
return Optional.empty();
}
try {
long bytesRead = Long.parseLong(bytesReadString);
- long bytesWritten = Long.parseLong(bytesWrittenString);
+ if (bytesRead == 0) {
+ log.warn("Bytes read is zero, skipping file size check.");
+ return Optional.empty();
+ }
+ long bytesWritten = 0;
+ if (bytesWrittenString == null) {
+ log.error("Missing bytesWritten value: bytesWrittenStr=null, assuming
0 bytes written.");
+ } else {
+ bytesWritten = Long.parseLong(bytesWrittenString);
+ }
return Optional.of(new TransferBytes(bytesRead, bytesWritten));
} catch (NumberFormatException e) {
log.error("Invalid number format for bytesRead or bytesWritten:
bytesRead='{}', bytesWritten='{}'",
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
index 0129891748..5a4d421215 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java
@@ -49,7 +49,7 @@ public class FileSizePolicyTest {
State state = new State();
// No properties set at all
FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
- Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ Assert.assertEquals(policy.executePolicy(),
TaskLevelPolicy.Result.NOT_EVALUATED);
}
@Test
@@ -64,9 +64,20 @@ public class FileSizePolicyTest {
// Reset state and only set bytes written, not bytes read
state = new State();
state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
-
+ // bytes read is null
policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
- Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ Assert.assertEquals(policy.executePolicy(),
TaskLevelPolicy.Result.NOT_EVALUATED);
+ }
+
+ @Test
+ public void testEmptyDirectoryHandling() {
+ State state = new State();
+ // Test case: Empty directory with 0 bytes read, no bytes written
(directory case)
+ state.setProp(FileSizePolicy.BYTES_READ_KEY, 0L);
+ // Don't set BYTES_WRITTEN_KEY to simulate directory copy scenario
+
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(),
TaskLevelPolicy.Result.NOT_EVALUATED);
}
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 57e95508b6..7809e785f5 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -28,6 +28,8 @@ public class ServiceMetricNames {
public static final String DATA_QUALITY_SUCCESS_FILE_COUNT =
"dataQualitySuccessFileCount";
public static final String DATA_QUALITY_FAILURE_FILE_COUNT =
"dataQualityFailureFileCount";
public static final String DATA_QUALITY_NON_EVALUATED_FILE_COUNT =
"dataQualityNonEvaluatedFileCount";
+ public static final String DATA_QUALITY_BYTES_READ = "dataQualityBytesRead";
+ public static final String DATA_QUALITY_BYTES_WRITTEN =
"dataQualityBytesWritten";
// Flow Compilation Meters and Timer
public static final String FLOW_COMPILATION_SUCCESSFUL_METER =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "flowCompilation.successful";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
index 959f57053c..23590789a0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.metrics.OpenTelemetryMetrics;
import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.policies.size.FileSizePolicy;
import org.apache.gobblin.qualitychecker.DataQualityStatus;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
@@ -66,6 +67,10 @@ public class DataQualityEvaluator {
private final int failedFiles;
// Number of files that were not evaluated for data quality for example
files not found or not processed
private final int nonEvaluatedFiles;
+ // total bytes read
+ private final long bytesRead;
+ // total bytes written
+ private final long bytesWritten;
}
/**
@@ -85,7 +90,7 @@ public class DataQualityEvaluator {
jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
// Emit dataset-specific metrics
emitMetrics(jobState, result.getQualityStatus(), result.getTotalFiles(),
result.getPassedFiles(),
- result.getFailedFiles(), result.getNonEvaluatedFiles(),
datasetState.getDatasetUrn());
+ result.getFailedFiles(), result.getNonEvaluatedFiles(),
result.getBytesRead(), result.getBytesWritten(), datasetState.getDatasetUrn());
return result;
}
@@ -103,17 +108,19 @@ public class DataQualityEvaluator {
int failedFilesCount = 0;
int passedFilesCount = 0;
int nonEvaluatedFilesCount = 0;
+ long bytesRead = 0;
+ long bytesWritten = 0;
for (TaskState taskState : taskStates) {
totalFiles++;
-
// Handle null task states gracefully
if (taskState == null) {
log.warn("Encountered null task state, skipping data quality
evaluation for this task");
nonEvaluatedFilesCount++;
continue;
}
-
+ bytesRead += taskState.getPropAsLong(FileSizePolicy.BYTES_READ_KEY, 0L);
+ bytesWritten +=
taskState.getPropAsLong(FileSizePolicy.BYTES_WRITTEN_KEY, 0L);
DataQualityStatus taskDataQuality = null;
String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
taskDataQuality = DataQualityStatus.fromString(result);
@@ -138,11 +145,11 @@ public class DataQualityEvaluator {
log.info("Data quality evaluation summary - Total: {}, Passed: {}, Failed:
{}, Not Evaluated: {}", totalFiles,
passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
return new DataQualityEvaluationResult(jobDataQualityStatus, totalFiles,
passedFilesCount, failedFilesCount,
- nonEvaluatedFilesCount);
+ nonEvaluatedFilesCount, bytesRead, bytesWritten);
}
private static void emitMetrics(JobState jobState, final DataQualityStatus
jobDataQuality, final int totalFiles,
- final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount,
+ final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount, final long bytesRead, final long bytesWritten,
final String datasetUrn) {
try {
// Check if OpenTelemetry is enabled
@@ -187,6 +194,17 @@ public class DataQualityEvaluator {
meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_NON_EVALUATED_FILE_COUNT)
.setDescription("Number of files that did not have data quality
evaluation").build()
.add(nonEvaluatedFilesCount, tags);
+
+ // Emit bytes read
+ meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_BYTES_READ)
+ .setDescription("Total bytes read").build()
+ .add(bytesRead, tags);
+
+ // Emit bytes written
+ meter.counterBuilder(ServiceMetricNames.DATA_QUALITY_BYTES_WRITTEN)
+ .setDescription("Total bytes written").build()
+ .add(bytesWritten, tags);
+
} catch (Exception e) {
log.error("Error in emitMetrics for job: {}", jobState.getJobName(), e);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 2dd2e83f5d..76d8f79fa6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -320,6 +320,8 @@ public class Task implements TaskIFace {
private void computeAndUpdateTaskDataQuality() {
DataQualityStatus overallTaskDataQuality = DataQualityStatus.PASSED;
+ boolean hasEvaluatedForks = false;
+
for (Optional<Fork> fork : this.forks.keySet()) {
if (fork.isPresent()) {
TaskState forkTaskState = fork.get().getTaskState();
@@ -329,17 +331,37 @@ public class Task implements TaskIFace {
try {
if (result != null) {
forkDataQualityStatus = DataQualityStatus.valueOf(result);
+ } else {
+ forkDataQualityStatus = DataQualityStatus.NOT_EVALUATED;
}
} catch (IllegalArgumentException e) {
Log.warn("Unknown data quality status encountered: " + result);
forkDataQualityStatus = DataQualityStatus.UNKNOWN;
}
- if (DataQualityStatus.FAILED == forkDataQualityStatus) {
- overallTaskDataQuality = DataQualityStatus.FAILED;
+ /*
+ * If any fork fails, overall status should be FAILED
+ * FAILED status cannot be overridden by subsequent successes
+ * Handle NOT_EVALUATED and UNKNOWN appropriately
+ * If forkDataQualityStatus is PASSED and overall is not
FAILED/UNKNOWN, keep PASSED
+ */
+ if (forkDataQualityStatus != DataQualityStatus.NOT_EVALUATED) {
+ hasEvaluatedForks = true;
+ if (forkDataQualityStatus == DataQualityStatus.FAILED) {
+ overallTaskDataQuality = DataQualityStatus.FAILED;
+ } else if (forkDataQualityStatus == DataQualityStatus.UNKNOWN &&
+ overallTaskDataQuality != DataQualityStatus.FAILED) {
+ overallTaskDataQuality = DataQualityStatus.UNKNOWN;
+ }
}
}
}
}
+
+ // If no forks were evaluated, set overall task status to NOT_EVALUATED
+ if (!hasEvaluatedForks) {
+ overallTaskDataQuality = DataQualityStatus.NOT_EVALUATED;
+ }
+
LOG.info("Data quality state of the task is {}", overallTaskDataQuality);
this.taskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
overallTaskDataQuality.name());
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 0ff7cde453..81e2d69e2c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -630,11 +630,7 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
TaskLevelPolicyCheckResults taskResults =
this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState,
this.branches > 1 ? this.index : -1)
.executePolicies();
- boolean hasFailureForMandatoryPolicy = taskResults.getPolicyResults()
- .getOrDefault(TaskLevelPolicy.Result.FAILED,
java.util.Collections.emptySet())
- .contains(TaskLevelPolicy.Type.FAIL);
- forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
- hasFailureForMandatoryPolicy ? DataQualityStatus.FAILED.name() :
DataQualityStatus.PASSED.name());
+ this.computeAndUpdateForkDataQuality(taskResults);
TaskPublisher publisher =
this.taskContext.getTaskPublisher(this.forkTaskState, taskResults);
switch (publisher.canPublish()) {
case SUCCESS:
@@ -659,6 +655,24 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
}
}
+ private void computeAndUpdateForkDataQuality(TaskLevelPolicyCheckResults
taskResults) {
+ boolean hasFailureForMandatoryPolicy =
+
taskResults.getPolicyResults().getOrDefault(TaskLevelPolicy.Result.FAILED,
java.util.Collections.emptySet())
+ .contains(TaskLevelPolicy.Type.FAIL);
+ boolean hasNotEvaluatedForMandatoryPolicy = taskResults.getPolicyResults()
+ .getOrDefault(TaskLevelPolicy.Result.NOT_EVALUATED,
java.util.Collections.emptySet())
+ .contains(TaskLevelPolicy.Type.FAIL);
+ String forkLevelDataQualityResult;
+ if (hasFailureForMandatoryPolicy) {
+ forkLevelDataQualityResult = DataQualityStatus.FAILED.name();
+ } else if (hasNotEvaluatedForMandatoryPolicy) {
+ forkLevelDataQualityResult = DataQualityStatus.NOT_EVALUATED.name();
+ } else {
+ forkLevelDataQualityResult = DataQualityStatus.PASSED.name();
+ }
+ forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
forkLevelDataQualityResult);
+ }
+
/**
* Commit task data.
*/