[ 
https://issues.apache.org/jira/browse/HUDI-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-9083:
---------------------------------
    Labels: pull-request-available  (was: )

> testMultiWriterWithAsyncTableServicesWithConflict is flaky
> ----------------------------------------------------------
>
>                 Key: HUDI-9083
>                 URL: https://issues.apache.org/jira/browse/HUDI-9083
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Davis Zhang
>            Assignee: sivabalan narayanan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.0.2
>
>
> {code:java}
> public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType 
> tableType, Class<? extends LockProvider<?>> providerClass,
>                                                               
> ConflictResolutionStrategy resolutionStrategy) throws Exception {
>   // create inserts X 1
>   if (tableType == MERGE_ON_READ) {
>     setUpMORTestTable();
>   }
>   // Use RDD API to perform clustering (TODO: Fix row-writer API)
>   Properties properties = new Properties();
>   properties.put("hoodie.datasource.write.row.writer.enable", 
> String.valueOf(false));
>   // Disabling embedded timeline server, it doesn't work with multiwriter
>   HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
>       .withCleanConfig(HoodieCleanConfig.newBuilder()
>           .withAutoClean(false)
>           .withAsyncClean(true)
>           .retainCommits(0)
>           
> .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
>       .withCompactionConfig(HoodieCompactionConfig.newBuilder()
>           .withInlineCompaction(false)
>           .withMaxNumDeltaCommitsBeforeCompaction(2).build())
>       .withEmbeddedTimelineServerEnabled(false)
>       // Timeline-server-based markers are not used for multi-writer tests
>       .withMarkersType(MarkerType.DIRECT.name())
>       
> .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
>           FileSystemViewStorageType.MEMORY).build())
>       
> .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
>       
> .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
>           .withConflictResolutionStrategy(resolutionStrategy)
>           .build()).withAutoCommit(false).withProperties(lockProperties)
>       .withProperties(properties);
>   Set<String> validInstants = new HashSet<>();
>   // Create the first commit with inserts
>   HoodieWriteConfig cfg = writeConfigBuilder.build();
>   SparkRDDWriteClient client = getHoodieWriteClient(cfg);
>   String firstCommitTime = client.createNewInstantTime();
>   createCommitWithInserts(cfg, client, "000", firstCommitTime, 200, true);
>   validInstants.add(firstCommitTime);
>   // Create 2 commits with upserts
>   String secondCommitTime = client.createNewInstantTime();
>   createCommitWithUpserts(cfg, client, firstCommitTime, Option.of("000"), 
> secondCommitTime, 100);
>   String thirdCommitTime = client.createNewInstantTime();
>   createCommitWithUpserts(cfg, client, secondCommitTime, Option.of("000"), 
> thirdCommitTime, 100);
>   validInstants.add(secondCommitTime);
>   validInstants.add(thirdCommitTime);
>   // Three clients running actions in parallel
>   final int threadCount = 3;
>   final CountDownLatch scheduleCountDownLatch = new 
> CountDownLatch(threadCount);
>   final ExecutorService executors = Executors.newFixedThreadPool(threadCount);
>   // Write config with clustering enabled
>   final HoodieWriteConfig cfg2 = writeConfigBuilder
>       .withClusteringConfig(HoodieClusteringConfig.newBuilder()
>           .withInlineClustering(true)
>           .withInlineClusteringNumCommits(1)
>           .build())
>       .build();
>   final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
>   final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
>   final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
>   // Test with concurrent operations could be flaky, to reduce possibility of 
> wrong ordering some queue is added
>   // For InProcessLockProvider we could wait less
>   final int waitAndRunFirst = 
> providerClass.isAssignableFrom(InProcessLockProvider.class) ? 2000 : 20000;
>   final int waitAndRunSecond = 
> providerClass.isAssignableFrom(InProcessLockProvider.class) ? 3000 : 30000;
>   // Create upserts, schedule cleaning, schedule compaction in parallel
>   Future future1 = executors.submit(() -> {
>     final String newCommitTime = client1.createNewInstantTime();
>     final int numRecords = 100;
>     final String commitTimeBetweenPrevAndNew = secondCommitTime;
>     // We want the upsert to go through only after the compaction
>     // and cleaning schedule completion. So, waiting on latch here.
>     latchCountDownAndWait(scheduleCountDownLatch, waitAndRunSecond);
>     if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy 
> instanceof PreferWriterConflictResolutionStrategy)) {
>       // HUDI-6897: Improve 
> SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC
>       // There is no need to throw concurrent modification exception for the 
> simple strategy under NB-CC, because the compactor would finally resolve the 
> conflicts instead.
> vvvvvvvvvvvvvvvvvvvvvvvv this assertion failed vvvvvvvvvvvvvvvvvvvvvvvvvvvv
>       // Since the concurrent modifications went in, this upsert has
>       // to fail
>       assertThrows(HoodieWriteConflictException.class, () -> {
>         createCommitWithUpserts(cfg, client1, thirdCommitTime, 
> Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
>       });
>     } else {
>       // We don't have the compaction for COW and so this upsert
>       // has to pass
>       assertDoesNotThrow(() -> {
>         createCommitWithUpserts(cfg, client1, thirdCommitTime, 
> Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
>       });
>       validInstants.add(newCommitTime);
>     }
>   });
>   Future future2 = executors.submit(() -> {
>     if (tableType == MERGE_ON_READ) {
>       assertDoesNotThrow(() -> {
>         String compactionTimeStamp = client2.createNewInstantTime();
>         client2.scheduleTableService(compactionTimeStamp, Option.empty(), 
> TableServiceType.COMPACT);
>       });
>     }
>     latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
>   });
>   Future future3 = executors.submit(() -> {
>     assertDoesNotThrow(() -> {
>       latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
>       String cleanCommitTime = client3.createNewInstantTime();
>       client3.scheduleTableService(cleanCommitTime, Option.empty(), 
> TableServiceType.CLEAN);
>     });
>   });
>   future1.get(); {code}
>  
> spark-tests (scala-2.12, spark3.5)
> {code:java}
> Caused by: org.opentest4j.AssertionFailedError: Expected 
> org.apache.hudi.exception.HoodieWriteConflictException to be thrown, but 
> nothing was thrown.
> 6648  at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:71)
> 6649  at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:37)
> 6650  at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3082)
> 6651  at 
> org.apache.hudi.client.TestHoodieClientMultiWriter.lambda$testMultiWriterWithAsyncTableServicesWithConflict$12(TestHoodieClientMultiWriter.java:837)
> 6652  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 6653  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 6654  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 6655  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 6656  at java.lang.Thread.run(Thread.java:750) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to