Davis Zhang created HUDI-9083:
---------------------------------

             Summary: testMultiWriterWithAsyncTableServicesWithConflict is flaky
                 Key: HUDI-9083
                 URL: https://issues.apache.org/jira/browse/HUDI-9083
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Davis Zhang


{code:java}
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType 
tableType, Class<? extends LockProvider<?>> providerClass,
                                                              
ConflictResolutionStrategy resolutionStrategy) throws Exception {
  // create inserts X 1
  if (tableType == MERGE_ON_READ) {
    setUpMORTestTable();
  }

  // Use RDD API to perform clustering (TODO: Fix row-writer API)
  Properties properties = new Properties();
  properties.put("hoodie.datasource.write.row.writer.enable", 
String.valueOf(false));

  // Disabling embedded timeline server, it doesn't work with multiwriter
  HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
      .withCleanConfig(HoodieCleanConfig.newBuilder()
          .withAutoClean(false)
          .withAsyncClean(true)
          .retainCommits(0)
          
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
      .withCompactionConfig(HoodieCompactionConfig.newBuilder()
          .withInlineCompaction(false)
          .withMaxNumDeltaCommitsBeforeCompaction(2).build())
      .withEmbeddedTimelineServerEnabled(false)
      // Timeline-server-based markers are not used for multi-writer tests
      .withMarkersType(MarkerType.DIRECT.name())
      
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
          FileSystemViewStorageType.MEMORY).build())
      
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
      
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
          .withConflictResolutionStrategy(resolutionStrategy)
          .build()).withAutoCommit(false).withProperties(lockProperties)
      .withProperties(properties);

  Set<String> validInstants = new HashSet<>();

  // Create the first commit with inserts
  HoodieWriteConfig cfg = writeConfigBuilder.build();
  SparkRDDWriteClient client = getHoodieWriteClient(cfg);
  String firstCommitTime = client.createNewInstantTime();
  createCommitWithInserts(cfg, client, "000", firstCommitTime, 200, true);
  validInstants.add(firstCommitTime);

  // Create 2 commits with upserts
  String secondCommitTime = client.createNewInstantTime();
  createCommitWithUpserts(cfg, client, firstCommitTime, Option.of("000"), 
secondCommitTime, 100);
  String thirdCommitTime = client.createNewInstantTime();
  createCommitWithUpserts(cfg, client, secondCommitTime, Option.of("000"), 
thirdCommitTime, 100);
  validInstants.add(secondCommitTime);
  validInstants.add(thirdCommitTime);

  // Three clients running actions in parallel
  final int threadCount = 3;
  final CountDownLatch scheduleCountDownLatch = new CountDownLatch(threadCount);
  final ExecutorService executors = Executors.newFixedThreadPool(threadCount);

  // Write config with clustering enabled
  final HoodieWriteConfig cfg2 = writeConfigBuilder
      .withClusteringConfig(HoodieClusteringConfig.newBuilder()
          .withInlineClustering(true)
          .withInlineClusteringNumCommits(1)
          .build())
      .build();
  final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
  final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
  final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);

  // Test with concurrent operations could be flaky, to reduce possibility of 
wrong ordering some queue is added
  // For InProcessLockProvider we could wait less
  final int waitAndRunFirst = 
providerClass.isAssignableFrom(InProcessLockProvider.class) ? 2000 : 20000;
  final int waitAndRunSecond = 
providerClass.isAssignableFrom(InProcessLockProvider.class) ? 3000 : 30000;

  // Create upserts, schedule cleaning, schedule compaction in parallel
  Future future1 = executors.submit(() -> {
    final String newCommitTime = client1.createNewInstantTime();
    final int numRecords = 100;
    final String commitTimeBetweenPrevAndNew = secondCommitTime;

    // We want the upsert to go through only after the compaction
    // and cleaning schedule completion. So, waiting on latch here.
    latchCountDownAndWait(scheduleCountDownLatch, waitAndRunSecond);
    if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy 
instanceof PreferWriterConflictResolutionStrategy)) {
      // HUDI-6897: Improve 
SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC
      // There is no need to throw concurrent modification exception for the 
simple strategy under NB-CC, because the compactor would finally resolve the 
conflicts instead.


vvvvvvvvvvvvvvvvvvvvvvvv this assertion failed vvvvvvvvvvvvvvvvvvvvvvvvvvvv
      // Since the concurrent modifications went in, this upsert has
      // to fail
      assertThrows(HoodieWriteConflictException.class, () -> {
        createCommitWithUpserts(cfg, client1, thirdCommitTime, 
Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
      });
    } else {
      // We don't have the compaction for COW and so this upsert
      // has to pass
      assertDoesNotThrow(() -> {
        createCommitWithUpserts(cfg, client1, thirdCommitTime, 
Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
      });
      validInstants.add(newCommitTime);
    }
  });

  Future future2 = executors.submit(() -> {
    if (tableType == MERGE_ON_READ) {
      assertDoesNotThrow(() -> {
        String compactionTimeStamp = client2.createNewInstantTime();
        client2.scheduleTableService(compactionTimeStamp, Option.empty(), 
TableServiceType.COMPACT);
      });
    }
    latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
  });

  Future future3 = executors.submit(() -> {
    assertDoesNotThrow(() -> {
      latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
      String cleanCommitTime = client3.createNewInstantTime();
      client3.scheduleTableService(cleanCommitTime, Option.empty(), 
TableServiceType.CLEAN);
    });
  });
  future1.get(); {code}
 

spark-tests (scala-2.12, spark3.5)
{code:java}
Caused by: org.opentest4j.AssertionFailedError: Expected 
org.apache.hudi.exception.HoodieWriteConflictException to be thrown, but 
nothing was thrown.
6648    at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:71)
6649    at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:37)
6650    at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3082)
6651    at 
org.apache.hudi.client.TestHoodieClientMultiWriter.lambda$testMultiWriterWithAsyncTableServicesWithConflict$12(TestHoodieClientMultiWriter.java:837)
6652    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
6653    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
6654    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
6655    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
6656    at java.lang.Thread.run(Thread.java:750) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to