This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 146f39d49e5 [HUDI-5593] Fixing deadlocks due to async cleaner awaiting
for lock while main thread is acquired the lock and awaiting for async cleaner
to finish (#7739)
146f39d49e5 is described below
commit 146f39d49e53eeef40bddd657b9ab11e217f3bc3
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Jan 24 00:28:11 2023 -0800
[HUDI-5593] Fixing deadlocks due to async cleaner awaiting for lock while
main thread is acquired the lock and awaiting for async cleaner to finish
(#7739)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 61 +++++++++-------------
.../apache/hudi/client/BaseHoodieWriteClient.java | 52 +++++++++++-------
.../java/org/apache/hudi/table/HoodieTable.java | 12 ++++-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 9 ++--
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 2 +-
.../apache/hudi/client/HoodieJavaWriteClient.java | 3 +-
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 2 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 3 +-
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 4 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 4 +-
10 files changed, 85 insertions(+), 67 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index ec1041e3ab0..390bc4b9714 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -308,26 +308,6 @@ public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient i
return scheduleTableService(instantTime, extraMetadata,
TableServiceType.CLUSTER).isPresent();
}
- /**
- * Schedules a new cleaning instant.
- *
- * @param extraMetadata Extra Metadata to be stored
- */
- protected Option<String> scheduleCleaning(Option<Map<String, String>>
extraMetadata) throws HoodieIOException {
- String instantTime = HoodieActiveTimeline.createNewInstantTime();
- return scheduleCleaningAtInstant(instantTime, extraMetadata) ?
Option.of(instantTime) : Option.empty();
- }
-
- /**
- * Schedules a new cleaning instant with passed-in instant time.
- *
- * @param instantTime cleaning Instant Time
- * @param extraMetadata Extra Metadata to be stored
- */
- protected boolean scheduleCleaningAtInstant(String instantTime,
Option<Map<String, String>> extraMetadata) throws HoodieIOException {
- return scheduleTableService(instantTime, extraMetadata,
TableServiceType.CLEAN).isPresent();
- }
-
/**
* Ensures clustering instant is in expected state and performs clustering
for the plan stored in metadata.
*
@@ -527,16 +507,31 @@ public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient i
*
* @param cleanInstantTime instant time for clean.
* @param scheduleInline true if needs to be scheduled inline. false
otherwise.
- * @param skipLocking if this is triggered by another parent
transaction, locking can be skipped.
*/
@Nullable
+ @Deprecated
public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline, boolean skipLocking) throws HoodieIOException {
+ return clean(cleanInstantTime, scheduleInline);
+ }
+
+ /**
+ * Clean up any stale/old files/data lying around (either on file storage or
index storage) based on the
+ * configurations and CleaningPolicy used. (typically files that no longer
can be used by a running query can be
+ * cleaned). This API provides the flexibility to schedule clean instant
asynchronously via
+ * {@link BaseHoodieTableServiceClient#scheduleTableService(String, Option,
TableServiceType)} and disable inline scheduling
+ * of clean.
+ *
+ * @param cleanInstantTime instant time for clean.
+ * @param scheduleInline true if needs to be scheduled inline. false
otherwise.
+ */
+ @Nullable
+ public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline) throws HoodieIOException {
if (!tableServicesEnabled(config)) {
return null;
}
final Timer.Context timerContext = metrics.getCleanCtx();
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
- HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
+ HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
HoodieTable table = createTable(config, hadoopConf);
if (config.allowMultipleCleans() ||
!table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
{
@@ -554,7 +549,7 @@ public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient i
}
// Proceeds to execute any requested or inflight clean instances in the
timeline
- HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime,
skipLocking);
+ HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -569,16 +564,15 @@ public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient i
* Trigger archival for the table. This ensures that the number of commits
do not explode
* and keep increasing unbounded over time.
* @param table table to commit on.
- * @param acquireLockForArchival true if lock has to be acquired for
archival. false otherwise.
*/
- protected void archive(HoodieTable table, boolean acquireLockForArchival) {
+ protected void archive(HoodieTable table) {
if (!tableServicesEnabled(config)) {
return;
}
try {
// We cannot have unbounded commit files. Archive commits if we have to
archive
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config,
table);
- archiver.archiveIfRequired(context, acquireLockForArchival);
+ archiver.archiveIfRequired(context, true);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe);
}
@@ -670,23 +664,18 @@ public abstract class BaseHoodieTableServiceClient<O>
extends BaseHoodieClient i
* @return true if rollback was triggered. false otherwise.
*/
protected Boolean rollbackFailedWrites() {
- return rollbackFailedWrites(false);
- }
-
- /**
- * Rollback all failed writes.
- * @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
- * @return true if rollback was triggered. false otherwise.
- */
- protected Boolean rollbackFailedWrites(boolean skipLocking) {
HoodieTable table = createTable(config, hadoopConf);
List<String> instantsToRollback =
getInstantsToRollback(table.getMetaClient(),
config.getFailedWritesCleanPolicy(), Option.empty());
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks =
getPendingRollbackInfos(table.getMetaClient());
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry,
Option.empty()));
- rollbackFailedWrites(pendingRollbacks, skipLocking);
+ rollbackFailedWrites(pendingRollbacks);
return !pendingRollbacks.isEmpty();
}
+ protected void rollbackFailedWrites(Map<String,
Option<HoodiePendingRollbackInfo>> instantsToRollback) {
+ rollbackFailedWrites(instantsToRollback, false);
+ }
+
protected void rollbackFailedWrites(Map<String,
Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
// sort in reverse order of commit times
LinkedHashMap<String, Option<HoodiePendingRollbackInfo>>
reverseSortedRollbackInstants = instantsToRollback.entrySet()
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 818fa82e568..c3260914bd5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -231,8 +231,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
commit(table, commitActionType, instantTime, metadata, stats);
- // already within lock, and so no lock requried for archival
- postCommit(table, metadata, instantTime, extraMetadata, false);
+ postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
releaseResources();
} catch (IOException e) {
@@ -241,6 +240,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
this.txnManager.endTransaction(Option.of(inflightInstant));
}
+ // trigger clean and archival.
+ // Each internal call should ensure to lock if required.
+ mayBeCleanAndArchive(table);
// We don't want to fail the commit if
hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if
false
try {
// do this outside of lock since compaction, clustering can be time
taking and we don't need a lock for the entire execution period
@@ -514,21 +516,26 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @param metadata Commit Metadata corresponding to committed instant
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
- * @param acquireLockForArchival true if lock has to be acquired for
archival. false otherwise.
*/
- protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata,
- boolean acquireLockForArchival) {
+ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata) {
try {
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- autoCleanOnCommit();
- autoArchiveOnCommit(table, acquireLockForArchival);
} finally {
this.heartbeatClient.stop(instantTime);
}
}
+ /**
+ * Triggers cleaning and archival for the table of interest. This method is
called outside of locks. So, internal callers should ensure they acquire lock
whereever applicable.
+ * @param table instance of {@link HoodieTable} of interest.
+ */
+ protected void mayBeCleanAndArchive(HoodieTable table) {
+ autoCleanOnCommit();
+ autoArchiveOnCommit(table);
+ }
+
protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
tableServiceClient.runTableServicesInline(table, metadata, extraMetadata);
}
@@ -545,11 +552,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
} else {
LOG.info("Start to clean synchronously.");
// Do not reuse instantTime for clean as metadata table requires all
changes to have unique instant timestamps.
- clean(true);
+ clean();
}
}
- protected void autoArchiveOnCommit(HoodieTable table, boolean
acquireLockForArchival) {
+ protected void autoArchiveOnCommit(HoodieTable table) {
if (!config.isAutoArchive()) {
return;
}
@@ -560,7 +567,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
LOG.info("Async archiver has finished");
} else {
LOG.info("Start to archive synchronously.");
- archive(table, acquireLockForArchival);
+ archive(table);
}
}
@@ -729,10 +736,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
* @return instance of {@link HoodieCleanMetadata}.
*/
+ @Deprecated
public HoodieCleanMetadata clean(String cleanInstantTime, boolean
skipLocking) throws HoodieIOException {
- return clean(cleanInstantTime, true, skipLocking);
+ return clean(cleanInstantTime, true, false);
}
+
/**
* Clean up any stale/old files/data lying around (either on file storage or
index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer
can be used by a running query can be
@@ -744,11 +753,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline, boolean skipLocking) throws HoodieIOException {
- return tableServiceClient.clean(cleanInstantTime, scheduleInline,
skipLocking);
+ return tableServiceClient.clean(cleanInstantTime, scheduleInline);
}
public HoodieCleanMetadata clean() {
- return clean(false);
+ return clean(HoodieActiveTimeline.createNewInstantTime());
}
/**
@@ -757,18 +766,18 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
* @return instance of {@link HoodieCleanMetadata}.
*/
+ @Deprecated
public HoodieCleanMetadata clean(boolean skipLocking) {
- return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
+ return clean(HoodieActiveTimeline.createNewInstantTime());
}
/**
* Trigger archival for the table. This ensures that the number of commits
do not explode
* and keep increasing unbounded over time.
* @param table table to commit on.
- * @param acquireLockForArchival true if lock has to be acquired for
archival. false otherwise.
*/
- protected void archive(HoodieTable table, boolean acquireLockForArchival) {
- tableServiceClient.archive(table, acquireLockForArchival);
+ protected void archive(HoodieTable table) {
+ tableServiceClient.archive(table);
}
/**
@@ -778,7 +787,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
public void archive() {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
- archive(table, true);
+ archive(table);
}
/**
@@ -1240,6 +1249,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
}
}
+ /**
+ * Upgrades the hoodie table if need be when moving to a new Hudi version.
+ * This method is called within a lock. Try to avoid double locking from
within this method.
+ * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+ * @param instantTime instant time of interest if we have one.
+ */
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String>
instantTime) {
UpgradeDowngrade upgradeDowngrade =
new UpgradeDowngrade(metaClient, config, context,
upgradeDowngradeHelper);
@@ -1252,7 +1267,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
if (!instantsToRollback.isEmpty()) {
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks =
tableServiceClient.getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry ->
pendingRollbacks.putIfAbsent(entry, Option.empty()));
-
tableServiceClient.rollbackFailedWrites(pendingRollbacks, true);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index ff9177ab7d2..af8d8d23261 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -495,7 +495,17 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
*
* @return information on cleaned file slices
*/
- public abstract HoodieCleanMetadata clean(HoodieEngineContext context,
String cleanInstantTime, boolean skipLocking);
+ @Deprecated
+ public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime, boolean skipLocking) {
+ return clean(context, cleanInstantTime);
+ }
+
+ /**
+ * Executes a new clean action.
+ *
+ * @return information on cleaned file slices
+ */
+ public abstract HoodieCleanMetadata clean(HoodieEngineContext context,
String cleanInstantTime);
/**
* Schedule rollback for the instant time.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 2ecd1c78076..7d0728b95eb 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -338,18 +338,21 @@ public class HoodieFlinkWriteClient<T> extends
protected void postCommit(HoodieTable table,
HoodieCommitMetadata metadata,
String instantTime,
- Option<Map<String, String>> extraMetadata,
- boolean acquireLockForArchival) {
+ Option<Map<String, String>> extraMetadata) {
try {
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), createTable(config,
hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- autoArchiveOnCommit(table, acquireLockForArchival);
} finally {
this.heartbeatClient.stop(instantTime);
}
}
+ @Override
+ protected void mayBeCleanAndArchive(HoodieTable table) {
+ autoArchiveOnCommit(table);
+ }
+
@Override
public void commitCompaction(
String compactionInstantTime,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index f09cf106db2..cb046e2e91f 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -318,7 +318,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
}
@Override
- public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime, boolean skipLocking) {
+ public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime) {
return new CleanActionExecutor(context, config, this,
cleanInstantTime).execute();
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 0f7f48194cd..af35078b9a9 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -206,7 +206,8 @@ public class HoodieJavaWriteClient<T> extends
result.getWriteStats().get().size());
}
- postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty(), true);
+ postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty());
+ mayBeCleanAndArchive(hoodieTable);
emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
hoodieTable.getMetaClient().getCommitActionType());
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index f3899b38590..5431bfd0c4f 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -216,7 +216,7 @@ public class HoodieJavaCopyOnWriteTable<T>
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context,
- String cleanInstantTime, boolean
skipLocking) {
+ String cleanInstantTime) {
return new CleanActionExecutor(context, config, this,
cleanInstantTime).execute();
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 5c094b8d5be..86f559260a7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -267,7 +267,8 @@ public class SparkRDDWriteClient<T> extends
result.getWriteStats().get().size());
}
- postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty(), true);
+ postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty());
+ mayBeCleanAndArchive(hoodieTable);
emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
hoodieTable.getMetaClient().getCommitActionType());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 9691c8a9ab7..fa8f98022b4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -259,8 +259,8 @@ public class HoodieSparkCopyOnWriteTable<T>
}
@Override
- public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime, boolean skipLocking) {
- return new CleanActionExecutor<>(context, config, this, cleanInstantTime,
skipLocking).execute();
+ public HoodieCleanMetadata clean(HoodieEngineContext context, String
cleanInstantTime) {
+ return new CleanActionExecutor<>(context, config, this, cleanInstantTime,
false).execute();
}
@Override
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index fc59d0ff9e0..c2073c703d9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -984,7 +984,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
@ParameterizedTest
- @CsvSource(value = {"false, AVRO", "false, SPARK"}) // TODO set asyncClean
to true; disabled due to lock acquiring issue (HUDI-5593)
+ @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false,
SPARK"})
public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean,
HoodieRecordType recordType) throws Exception {
String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive"
+ asyncClean;
@@ -1052,7 +1052,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
configs.add(String.format("%s=%s",
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(),
asyncClean));
configs.add(String.format("%s=%s",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
- cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
+ configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(),
"DIRECT"));
if (asyncClean) {
configs.add(String.format("%s=%s",
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));