This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 577933891 [GOBBLIN-2026] Fail the dataset cleaner flow during any
Exception (#3913)
577933891 is described below
commit 577933891f2621b68244598c63dd041864363f97
Author: Arpit Varshney <[email protected]>
AuthorDate: Fri Apr 5 22:12:46 2024 +0530
[GOBBLIN-2026] Fail the dataset cleaner flow during any Exception (#3913)
* Fix countDownLatch race condition
* Address review comments
---
.../gobblin/data/management/retention/DatasetCleaner.java | 3 ++-
.../retention/dataset/MultiVersionCleanableDatasetBase.java | 12 +++++++-----
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 13841ce45..eb3f367cc 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -153,7 +153,6 @@ public class DatasetCleaner implements Instrumentable,
Closeable {
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
- DatasetCleaner.this.finishCleanSignal.get().countDown();
LOG.warn("Exception caught when cleaning " + dataset.datasetURN() +
".", throwable);
DatasetCleaner.this.throwables.add(throwable);
Instrumented.markMeter(DatasetCleaner.this.datasetsCleanFailureMeter);
@@ -161,6 +160,8 @@ public class DatasetCleaner implements Instrumentable,
Closeable {
ImmutableMap.of(RetentionEvents.CleanFailed.FAILURE_CONTEXT_METADATA_KEY,
ExceptionUtils.getFullStackTrace(throwable),
RetentionEvents.DATASET_URN_METADATA_KEY,
dataset.datasetURN()));
+ // Moving the countDown at the end, avoid race-condition with close
waiting for the countDown to be 0
+ DatasetCleaner.this.finishCleanSignal.get().countDown();
}
@Override
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
index 5c6f9710f..0e42ef58e 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
@@ -292,13 +292,13 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
cleanableVersionsBatch.add(version);
if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
boolean isCleanSuccess =
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
- versionFinderAndPolicy.getRetentionActions());
+ versionFinderAndPolicy.getRetentionActions(), false);
atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
}
}
if (!cleanableVersionsBatch.isEmpty()) {
boolean isCleanSuccess =
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
- versionFinderAndPolicy.getRetentionActions());
+ versionFinderAndPolicy.getRetentionActions(), false);
atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
}
} else {
@@ -309,7 +309,7 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
continue;
}
boolean isCleanSuccess =
- cleanDatasetVersions(versions, selectionPolicy,
versionFinderAndPolicy.getRetentionActions());
+ cleanDatasetVersions(versions, selectionPolicy,
versionFinderAndPolicy.getRetentionActions(), true);
atLeastOneFailureSeen = !isCleanSuccess || atLeastOneFailureSeen;
}
}
@@ -322,12 +322,14 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
}
private boolean cleanDatasetVersions(List<T> versions,
VersionSelectionPolicy<T> selectionPolicy,
- List<RetentionAction> retentionActions)
+ List<RetentionAction> retentionActions, boolean runningInBatchMode)
throws IOException {
boolean isCleanSuccess = true;
Collections.sort(versions, Collections.reverseOrder());
Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
- cleanImpl(deletableVersions);
+ if (!deletableVersions.isEmpty() || runningInBatchMode) {
+ cleanImpl(deletableVersions);
+ }
List<DatasetVersion> allVersions = Lists.newArrayList(versions);
for (RetentionAction retentionAction : retentionActions) {
try {