This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 146f39d49e5 [HUDI-5593] Fixing deadlocks due to async cleaner awaiting 
for lock while main thread is acquired the lock and awaiting for async cleaner 
to finish (#7739)
146f39d49e5 is described below

commit 146f39d49e53eeef40bddd657b9ab11e217f3bc3
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Jan 24 00:28:11 2023 -0800

    [HUDI-5593] Fixing deadlocks due to async cleaner awaiting for lock while 
main thread is acquired the lock and awaiting for async cleaner to finish 
(#7739)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 61 +++++++++-------------
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 52 +++++++++++-------
 .../java/org/apache/hudi/table/HoodieTable.java    | 12 ++++-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  9 ++--
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  2 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  3 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  2 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |  3 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  4 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  4 +-
 10 files changed, 85 insertions(+), 67 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index ec1041e3ab0..390bc4b9714 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -308,26 +308,6 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
     return scheduleTableService(instantTime, extraMetadata, 
TableServiceType.CLUSTER).isPresent();
   }
 
-  /**
-   * Schedules a new cleaning instant.
-   *
-   * @param extraMetadata Extra Metadata to be stored
-   */
-  protected Option<String> scheduleCleaning(Option<Map<String, String>> 
extraMetadata) throws HoodieIOException {
-    String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    return scheduleCleaningAtInstant(instantTime, extraMetadata) ? 
Option.of(instantTime) : Option.empty();
-  }
-
-  /**
-   * Schedules a new cleaning instant with passed-in instant time.
-   *
-   * @param instantTime   cleaning Instant Time
-   * @param extraMetadata Extra Metadata to be stored
-   */
-  protected boolean scheduleCleaningAtInstant(String instantTime, 
Option<Map<String, String>> extraMetadata) throws HoodieIOException {
-    return scheduleTableService(instantTime, extraMetadata, 
TableServiceType.CLEAN).isPresent();
-  }
-
   /**
    * Ensures clustering instant is in expected state and performs clustering 
for the plan stored in metadata.
    *
@@ -527,16 +507,31 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
    *
    * @param cleanInstantTime instant time for clean.
    * @param scheduleInline   true if needs to be scheduled inline. false 
otherwise.
-   * @param skipLocking      if this is triggered by another parent 
transaction, locking can be skipped.
    */
   @Nullable
+  @Deprecated
   public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
scheduleInline, boolean skipLocking) throws HoodieIOException {
+    return clean(cleanInstantTime, scheduleInline);
+  }
+
+  /**
+   * Clean up any stale/old files/data lying around (either on file storage or 
index storage) based on the
+   * configurations and CleaningPolicy used. (typically files that no longer 
can be used by a running query can be
+   * cleaned). This API provides the flexibility to schedule clean instant 
asynchronously via
+   * {@link BaseHoodieTableServiceClient#scheduleTableService(String, Option, 
TableServiceType)} and disable inline scheduling
+   * of clean.
+   *
+   * @param cleanInstantTime instant time for clean.
+   * @param scheduleInline   true if needs to be scheduled inline. false 
otherwise.
+   */
+  @Nullable
+  public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
scheduleInline) throws HoodieIOException {
     if (!tableServicesEnabled(config)) {
       return null;
     }
     final Timer.Context timerContext = metrics.getCleanCtx();
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
+        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
 
     HoodieTable table = createTable(config, hadoopConf);
     if (config.allowMultipleCleans() || 
!table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
 {
@@ -554,7 +549,7 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
     }
 
     // Proceeds to execute any requested or inflight clean instances in the 
timeline
-    HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, 
skipLocking);
+    HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime);
     if (timerContext != null && metadata != null) {
       long durationMs = metrics.getDurationInMs(timerContext.stop());
       metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -569,16 +564,15 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
    * Trigger archival for the table. This ensures that the number of commits 
do not explode
    * and keep increasing unbounded over time.
    * @param table table to commit on.
-   * @param acquireLockForArchival true if lock has to be acquired for 
archival. false otherwise.
    */
-  protected void archive(HoodieTable table, boolean acquireLockForArchival) {
+  protected void archive(HoodieTable table) {
     if (!tableServicesEnabled(config)) {
       return;
     }
     try {
       // We cannot have unbounded commit files. Archive commits if we have to 
archive
       HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, 
table);
-      archiver.archiveIfRequired(context, acquireLockForArchival);
+      archiver.archiveIfRequired(context, true);
     } catch (IOException ioe) {
       throw new HoodieIOException("Failed to archive", ioe);
     }
@@ -670,23 +664,18 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
    * @return true if rollback was triggered. false otherwise.
    */
   protected Boolean rollbackFailedWrites() {
-    return rollbackFailedWrites(false);
-  }
-
-  /**
-   * Rollback all failed writes.
-   * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
-   * @return true if rollback was triggered. false otherwise.
-   */
-  protected Boolean rollbackFailedWrites(boolean skipLocking) {
     HoodieTable table = createTable(config, hadoopConf);
     List<String> instantsToRollback = 
getInstantsToRollback(table.getMetaClient(), 
config.getFailedWritesCleanPolicy(), Option.empty());
     Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = 
getPendingRollbackInfos(table.getMetaClient());
     instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, 
Option.empty()));
-    rollbackFailedWrites(pendingRollbacks, skipLocking);
+    rollbackFailedWrites(pendingRollbacks);
     return !pendingRollbacks.isEmpty();
   }
 
+  protected void rollbackFailedWrites(Map<String, 
Option<HoodiePendingRollbackInfo>> instantsToRollback) {
+    rollbackFailedWrites(instantsToRollback, false);
+  }
+
   protected void rollbackFailedWrites(Map<String, 
Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
     // sort in reverse order of commit times
     LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> 
reverseSortedRollbackInstants = instantsToRollback.entrySet()
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 818fa82e568..c3260914bd5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -231,8 +231,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
         extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
       }
       commit(table, commitActionType, instantTime, metadata, stats);
-      // already within lock, and so no lock requried for archival
-      postCommit(table, metadata, instantTime, extraMetadata, false);
+      postCommit(table, metadata, instantTime, extraMetadata);
       LOG.info("Committed " + instantTime);
       releaseResources();
     } catch (IOException e) {
@@ -241,6 +240,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       this.txnManager.endTransaction(Option.of(inflightInstant));
     }
 
+    // trigger clean and archival.
+    // Each internal call should ensure to lock if required.
+    mayBeCleanAndArchive(table);
     // We don't want to fail the commit if 
hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if 
false
     try {
       // do this outside of lock since compaction, clustering can be time 
taking and we don't need a lock for the entire execution period
@@ -514,21 +516,26 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @param metadata      Commit Metadata corresponding to committed instant
    * @param instantTime   Instant Time
    * @param extraMetadata Additional Metadata passed by user
-   * @param acquireLockForArchival true if lock has to be acquired for 
archival. false otherwise.
    */
-  protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, 
String instantTime, Option<Map<String, String>> extraMetadata,
-                            boolean acquireLockForArchival) {
+  protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, 
String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-      autoCleanOnCommit();
-      autoArchiveOnCommit(table, acquireLockForArchival);
     } finally {
       this.heartbeatClient.stop(instantTime);
     }
   }
 
+  /**
+   * Triggers cleaning and archival for the table of interest. This method is 
called outside of locks. So, internal callers should ensure they acquire lock 
whereever applicable.
+   * @param table instance of {@link HoodieTable} of interest.
+   */
+  protected void mayBeCleanAndArchive(HoodieTable table) {
+    autoCleanOnCommit();
+    autoArchiveOnCommit(table);
+  }
+
   protected void runTableServicesInline(HoodieTable table, 
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
     tableServiceClient.runTableServicesInline(table, metadata, extraMetadata);
   }
@@ -545,11 +552,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     } else {
       LOG.info("Start to clean synchronously.");
       // Do not reuse instantTime for clean as metadata table requires all 
changes to have unique instant timestamps.
-      clean(true);
+      clean();
     }
   }
 
-  protected void autoArchiveOnCommit(HoodieTable table, boolean 
acquireLockForArchival) {
+  protected void autoArchiveOnCommit(HoodieTable table) {
     if (!config.isAutoArchive()) {
       return;
     }
@@ -560,7 +567,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       LOG.info("Async archiver has finished");
     } else {
       LOG.info("Start to archive synchronously.");
-      archive(table, acquireLockForArchival);
+      archive(table);
     }
   }
 
@@ -729,10 +736,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
    * @return instance of {@link HoodieCleanMetadata}.
    */
+  @Deprecated
   public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
skipLocking) throws HoodieIOException {
-    return clean(cleanInstantTime, true, skipLocking);
+    return clean(cleanInstantTime, true, false);
   }
 
+
   /**
    * Clean up any stale/old files/data lying around (either on file storage or 
index storage) based on the
    * configurations and CleaningPolicy used. (typically files that no longer 
can be used by a running query can be
@@ -744,11 +753,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
    */
   public HoodieCleanMetadata clean(String cleanInstantTime, boolean 
scheduleInline, boolean skipLocking) throws HoodieIOException {
-    return tableServiceClient.clean(cleanInstantTime, scheduleInline, 
skipLocking);
+    return tableServiceClient.clean(cleanInstantTime, scheduleInline);
   }
 
   public HoodieCleanMetadata clean() {
-    return clean(false);
+    return clean(HoodieActiveTimeline.createNewInstantTime());
   }
 
   /**
@@ -757,18 +766,18 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
    * @return instance of {@link HoodieCleanMetadata}.
    */
+  @Deprecated
   public HoodieCleanMetadata clean(boolean skipLocking) {
-    return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
+    return clean(HoodieActiveTimeline.createNewInstantTime());
   }
 
   /**
    * Trigger archival for the table. This ensures that the number of commits 
do not explode
    * and keep increasing unbounded over time.
    * @param table table to commit on.
-   * @param acquireLockForArchival true if lock has to be acquired for 
archival. false otherwise.
    */
-  protected void archive(HoodieTable table, boolean acquireLockForArchival) {
-    tableServiceClient.archive(table, acquireLockForArchival);
+  protected void archive(HoodieTable table) {
+    tableServiceClient.archive(table);
   }
 
   /**
@@ -778,7 +787,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   public void archive() {
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable table = createTable(config, hadoopConf);
-    archive(table, true);
+    archive(table);
   }
 
   /**
@@ -1240,6 +1249,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     }
   }
 
+  /**
+   * Upgrades the hoodie table if need be when moving to a new Hudi version.
+   * This method is called within a lock. Try to avoid double locking from 
within this method.
+   * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+   * @param instantTime instant time of interest if we have one.
+   */
   protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> 
instantTime) {
     UpgradeDowngrade upgradeDowngrade =
         new UpgradeDowngrade(metaClient, config, context, 
upgradeDowngradeHelper);
@@ -1252,7 +1267,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       if (!instantsToRollback.isEmpty()) {
         Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = 
tableServiceClient.getPendingRollbackInfos(metaClient);
         instantsToRollback.forEach(entry -> 
pendingRollbacks.putIfAbsent(entry, Option.empty()));
-
         tableServiceClient.rollbackFailedWrites(pendingRollbacks, true);
       }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index ff9177ab7d2..af8d8d23261 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -495,7 +495,17 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    *
    * @return information on cleaned file slices
    */
-  public abstract HoodieCleanMetadata clean(HoodieEngineContext context, 
String cleanInstantTime, boolean skipLocking);
+  @Deprecated
+  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime, boolean skipLocking) {
+    return clean(context, cleanInstantTime);
+  }
+
+  /**
+   * Executes a new clean action.
+   *
+   * @return information on cleaned file slices
+   */
+  public abstract HoodieCleanMetadata clean(HoodieEngineContext context, 
String cleanInstantTime);
 
   /**
    * Schedule rollback for the instant time.
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 2ecd1c78076..7d0728b95eb 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -338,18 +338,21 @@ public class HoodieFlinkWriteClient<T> extends
   protected void postCommit(HoodieTable table,
                             HoodieCommitMetadata metadata,
                             String instantTime,
-                            Option<Map<String, String>> extraMetadata,
-                            boolean acquireLockForArchival) {
+                            Option<Map<String, String>> extraMetadata) {
     try {
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), createTable(config, 
hadoopConf), instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-      autoArchiveOnCommit(table, acquireLockForArchival);
     } finally {
       this.heartbeatClient.stop(instantTime);
     }
   }
 
+  @Override
+  protected void mayBeCleanAndArchive(HoodieTable table) {
+    autoArchiveOnCommit(table);
+  }
+
   @Override
   public void commitCompaction(
       String compactionInstantTime,
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index f09cf106db2..cb046e2e91f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -318,7 +318,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
   }
 
   @Override
-  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime, boolean skipLocking) {
+  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime) {
     return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 0f7f48194cd..af35078b9a9 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -206,7 +206,8 @@ public class HoodieJavaWriteClient<T> extends
             result.getWriteStats().get().size());
       }
 
-      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, 
Option.empty(), true);
+      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, 
Option.empty());
+      mayBeCleanAndArchive(hoodieTable);
 
       emitCommitMetrics(instantTime, result.getCommitMetadata().get(), 
hoodieTable.getMetaClient().getCommitActionType());
     }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index f3899b38590..5431bfd0c4f 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -216,7 +216,7 @@ public class HoodieJavaCopyOnWriteTable<T>
 
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context,
-                                   String cleanInstantTime, boolean 
skipLocking) {
+                                   String cleanInstantTime) {
     return new CleanActionExecutor(context, config, this, 
cleanInstantTime).execute();
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 5c094b8d5be..86f559260a7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -267,7 +267,8 @@ public class SparkRDDWriteClient<T> extends
             result.getWriteStats().get().size());
       }
 
-      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, 
Option.empty(), true);
+      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, 
Option.empty());
+      mayBeCleanAndArchive(hoodieTable);
 
       emitCommitMetrics(instantTime, result.getCommitMetadata().get(), 
hoodieTable.getMetaClient().getCommitActionType());
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 9691c8a9ab7..fa8f98022b4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -259,8 +259,8 @@ public class HoodieSparkCopyOnWriteTable<T>
   }
 
   @Override
-  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime, boolean skipLocking) {
-    return new CleanActionExecutor<>(context, config, this, cleanInstantTime, 
skipLocking).execute();
+  public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime) {
+    return new CleanActionExecutor<>(context, config, this, cleanInstantTime, 
false).execute();
   }
 
   @Override
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index fc59d0ff9e0..c2073c703d9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -984,7 +984,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"false, AVRO", "false, SPARK"}) // TODO set asyncClean 
to true; disabled due to lock acquiring issue (HUDI-5593)
+  @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, 
SPARK"})
   public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, 
HoodieRecordType recordType) throws Exception {
     String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive" 
+ asyncClean;
 
@@ -1052,7 +1052,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     configs.add(String.format("%s=%s", 
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
     configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), 
asyncClean));
     configs.add(String.format("%s=%s", 
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
-    cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
+    configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), 
"DIRECT"));
     if (asyncClean) {
       configs.add(String.format("%s=%s", 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
           WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));

Reply via email to