[
https://issues.apache.org/jira/browse/HUDI-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davis Zhang updated HUDI-9083:
------------------------------
Fix Version/s: 1.0.2
> testMultiWriterWithAsyncTableServicesWithConflict is flaky
> ----------------------------------------------------------
>
> Key: HUDI-9083
> URL: https://issues.apache.org/jira/browse/HUDI-9083
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Davis Zhang
> Priority: Major
> Fix For: 1.0.2
>
>
> {code:java}
> public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType
> tableType, Class<? extends LockProvider<?>> providerClass,
>
> ConflictResolutionStrategy resolutionStrategy) throws Exception {
> // create inserts X 1
> if (tableType == MERGE_ON_READ) {
> setUpMORTestTable();
> }
> // Use RDD API to perform clustering (TODO: Fix row-writer API)
> Properties properties = new Properties();
> properties.put("hoodie.datasource.write.row.writer.enable",
> String.valueOf(false));
> // Disabling embedded timeline server, it doesn't work with multiwriter
> HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
> .withCleanConfig(HoodieCleanConfig.newBuilder()
> .withAutoClean(false)
> .withAsyncClean(true)
> .retainCommits(0)
>
> .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
> .withCompactionConfig(HoodieCompactionConfig.newBuilder()
> .withInlineCompaction(false)
> .withMaxNumDeltaCommitsBeforeCompaction(2).build())
> .withEmbeddedTimelineServerEnabled(false)
> // Timeline-server-based markers are not used for multi-writer tests
> .withMarkersType(MarkerType.DIRECT.name())
>
> .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
> FileSystemViewStorageType.MEMORY).build())
>
> .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
>
> .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
> .withConflictResolutionStrategy(resolutionStrategy)
> .build()).withAutoCommit(false).withProperties(lockProperties)
> .withProperties(properties);
> Set<String> validInstants = new HashSet<>();
> // Create the first commit with inserts
> HoodieWriteConfig cfg = writeConfigBuilder.build();
> SparkRDDWriteClient client = getHoodieWriteClient(cfg);
> String firstCommitTime = client.createNewInstantTime();
> createCommitWithInserts(cfg, client, "000", firstCommitTime, 200, true);
> validInstants.add(firstCommitTime);
> // Create 2 commits with upserts
> String secondCommitTime = client.createNewInstantTime();
> createCommitWithUpserts(cfg, client, firstCommitTime, Option.of("000"),
> secondCommitTime, 100);
> String thirdCommitTime = client.createNewInstantTime();
> createCommitWithUpserts(cfg, client, secondCommitTime, Option.of("000"),
> thirdCommitTime, 100);
> validInstants.add(secondCommitTime);
> validInstants.add(thirdCommitTime);
> // Three clients running actions in parallel
> final int threadCount = 3;
> final CountDownLatch scheduleCountDownLatch = new
> CountDownLatch(threadCount);
> final ExecutorService executors = Executors.newFixedThreadPool(threadCount);
> // Write config with clustering enabled
> final HoodieWriteConfig cfg2 = writeConfigBuilder
> .withClusteringConfig(HoodieClusteringConfig.newBuilder()
> .withInlineClustering(true)
> .withInlineClusteringNumCommits(1)
> .build())
> .build();
> final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
> final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
> final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
> // Test with concurrent operations could be flaky, to reduce possibility of
> wrong ordering some queue is added
> // For InProcessLockProvider we could wait less
> final int waitAndRunFirst =
> providerClass.isAssignableFrom(InProcessLockProvider.class) ? 2000 : 20000;
> final int waitAndRunSecond =
> providerClass.isAssignableFrom(InProcessLockProvider.class) ? 3000 : 30000;
> // Create upserts, schedule cleaning, schedule compaction in parallel
> Future future1 = executors.submit(() -> {
> final String newCommitTime = client1.createNewInstantTime();
> final int numRecords = 100;
> final String commitTimeBetweenPrevAndNew = secondCommitTime;
> // We want the upsert to go through only after the compaction
> // and cleaning schedule completion. So, waiting on latch here.
> latchCountDownAndWait(scheduleCountDownLatch, waitAndRunSecond);
> if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy
> instanceof PreferWriterConflictResolutionStrategy)) {
> // HUDI-6897: Improve
> SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC
> // There is no need to throw concurrent modification exception for the
> simple strategy under NB-CC, because the compactor would finally resolve the
> conflicts instead.
> vvvvvvvvvvvvvvvvvvvvvvvv this assertion failed vvvvvvvvvvvvvvvvvvvvvvvvvvvv
> // Since the concurrent modifications went in, this upsert has
> // to fail
> assertThrows(HoodieWriteConflictException.class, () -> {
> createCommitWithUpserts(cfg, client1, thirdCommitTime,
> Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
> });
> } else {
> // We don't have the compaction for COW and so this upsert
> // has to pass
> assertDoesNotThrow(() -> {
> createCommitWithUpserts(cfg, client1, thirdCommitTime,
> Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
> });
> validInstants.add(newCommitTime);
> }
> });
> Future future2 = executors.submit(() -> {
> if (tableType == MERGE_ON_READ) {
> assertDoesNotThrow(() -> {
> String compactionTimeStamp = client2.createNewInstantTime();
> client2.scheduleTableService(compactionTimeStamp, Option.empty(),
> TableServiceType.COMPACT);
> });
> }
> latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
> });
> Future future3 = executors.submit(() -> {
> assertDoesNotThrow(() -> {
> latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
> String cleanCommitTime = client3.createNewInstantTime();
> client3.scheduleTableService(cleanCommitTime, Option.empty(),
> TableServiceType.CLEAN);
> });
> });
> future1.get(); {code}
>
> spark-tests (scala-2.12, spark3.5)
> {code:java}
> Caused by: org.opentest4j.AssertionFailedError: Expected
> org.apache.hudi.exception.HoodieWriteConflictException to be thrown, but
> nothing was thrown.
> 6648 at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:71)
> 6649 at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:37)
> 6650 at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3082)
> 6651 at
> org.apache.hudi.client.TestHoodieClientMultiWriter.lambda$testMultiWriterWithAsyncTableServicesWithConflict$12(TestHoodieClientMultiWriter.java:837)
> 6652 at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 6653 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 6654 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 6655 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 6656 at java.lang.Thread.run(Thread.java:750) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)