This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 577933891 [GOBBLIN-2026] Fail the dataset cleaner flow during any 
Exception (#3913)
577933891 is described below

commit 577933891f2621b68244598c63dd041864363f97
Author: Arpit Varshney <[email protected]>
AuthorDate: Fri Apr 5 22:12:46 2024 +0530

    [GOBBLIN-2026] Fail the dataset cleaner flow during any Exception (#3913)
    
    * Fix countDownLatch race condition
    * Address review comments
---
 .../gobblin/data/management/retention/DatasetCleaner.java    |  3 ++-
 .../retention/dataset/MultiVersionCleanableDatasetBase.java  | 12 +++++++-----
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 13841ce45..eb3f367cc 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -153,7 +153,6 @@ public class DatasetCleaner implements Instrumentable, 
Closeable {
       Futures.addCallback(future, new FutureCallback<Void>() {
         @Override
         public void onFailure(Throwable throwable) {
-          DatasetCleaner.this.finishCleanSignal.get().countDown();
           LOG.warn("Exception caught when cleaning " + dataset.datasetURN() + 
".", throwable);
           DatasetCleaner.this.throwables.add(throwable);
           
Instrumented.markMeter(DatasetCleaner.this.datasetsCleanFailureMeter);
@@ -161,6 +160,8 @@ public class DatasetCleaner implements Instrumentable, 
Closeable {
               
ImmutableMap.of(RetentionEvents.CleanFailed.FAILURE_CONTEXT_METADATA_KEY,
                   ExceptionUtils.getFullStackTrace(throwable), 
RetentionEvents.DATASET_URN_METADATA_KEY,
                   dataset.datasetURN()));
+          // Moving the countDown at the end, avoid race-condition with close 
waiting for the countDown to be 0
+          DatasetCleaner.this.finishCleanSignal.get().countDown();
         }
 
         @Override
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
index 5c6f9710f..0e42ef58e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
@@ -292,13 +292,13 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
           cleanableVersionsBatch.add(version);
           if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
             boolean isCleanSuccess = 
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
-                versionFinderAndPolicy.getRetentionActions());
+                versionFinderAndPolicy.getRetentionActions(), false);
             atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
           }
         }
         if (!cleanableVersionsBatch.isEmpty()) {
           boolean isCleanSuccess = 
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
-              versionFinderAndPolicy.getRetentionActions());
+              versionFinderAndPolicy.getRetentionActions(), false);
           atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
         }
       } else {
@@ -309,7 +309,7 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
           continue;
         }
         boolean isCleanSuccess =
-            cleanDatasetVersions(versions, selectionPolicy, 
versionFinderAndPolicy.getRetentionActions());
+            cleanDatasetVersions(versions, selectionPolicy, 
versionFinderAndPolicy.getRetentionActions(), true);
         atLeastOneFailureSeen = !isCleanSuccess || atLeastOneFailureSeen;
       }
     }
@@ -322,12 +322,14 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
   }
 
   private boolean cleanDatasetVersions(List<T> versions, 
VersionSelectionPolicy<T> selectionPolicy,
-      List<RetentionAction> retentionActions)
+      List<RetentionAction> retentionActions, boolean runningInBatchMode)
       throws IOException {
     boolean isCleanSuccess = true;
     Collections.sort(versions, Collections.reverseOrder());
     Collection<T> deletableVersions = 
selectionPolicy.listSelectedVersions(versions);
-    cleanImpl(deletableVersions);
+    if (!deletableVersions.isEmpty() || runningInBatchMode) {
+      cleanImpl(deletableVersions);
+    }
     List<DatasetVersion> allVersions = Lists.newArrayList(versions);
     for (RetentionAction retentionAction : retentionActions) {
       try {

Reply via email to