manojpec commented on a change in pull request #4046:
URL: https://github.com/apache/hudi/pull/4046#discussion_r767073882
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -238,82 +255,101 @@ private void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
validInstants.add("002");
validInstants.add("003");
- ExecutorService executors = Executors.newFixedThreadPool(2);
- // write config with clustering enabled
- HoodieWriteConfig cfg2 = writeConfigBuilder
-
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
+
+ // Three clients running actions in parallel
+ final int threadCount = 3;
+ final CountDownLatch scheduleCountDownLatch = new
CountDownLatch(threadCount);
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+
+ // Write config with clustering enabled
+ final HoodieWriteConfig cfg2 = writeConfigBuilder
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(true)
+ .withInlineClusteringNumCommits(1)
+ .build())
.build();
- SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
- SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
+
// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
- String newCommitTime = "004";
- int numRecords = 100;
- String commitTimeBetweenPrevAndNew = "002";
- try {
- createCommitWithUpserts(cfg2, client1, "003",
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- fail("Conflicts not handled correctly");
- }
- validInstants.add("004");
- } catch (Exception e1) {
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- assertTrue(e1 instanceof HoodieWriteConflictException);
- }
+ final String newCommitTime = "004";
+ final int numRecords = 100;
+ final String commitTimeBetweenPrevAndNew = "002";
+
+ // We want the upsert to go through only after the compaction
+ // and cleaning schedule completion. So, waiting on latch here.
+ latchCountDownAndWait(scheduleCountDownLatch, 30000);
Review comment:
By having the count as 3, we can make sure all the clients have been
successfully scheduled at ready to start to next set of actions in parallel.
Your suggested approach of 1 will also work here (but without they doing
countdown). Having the count at 3 gives us more controlled parallelism.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]