nsivabalan commented on a change in pull request #4588:
URL: https://github.com/apache/hudi/pull/4588#discussion_r786307558
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -441,6 +445,135 @@ public void
testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
}
}
+ @Test
+ public void testHoodieClientMultiWriterAutoCommitForConflict() throws
Exception {
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
"100");
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
Review comment:
will fix it. Copied from else where I guess.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -441,6 +445,135 @@ public void
testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
}
}
+ @Test
+ public void testHoodieClientMultiWriterAutoCommitForConflict() throws
Exception {
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
"100");
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ // Timeline-server-based markers are not used for multi-writer tests
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+ .build()).withAutoCommit(true).withProperties(properties);
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+ HoodieWriteConfig cfg2 = writeConfigBuilder.build();
+
+ // Create the first commit
+ createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001",
5000, false);
+ // Start another inflight commit
+ String newCommitTime1 = "003";
+ String newCommitTime2 = "004";
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
+
+ List<HoodieRecord> updates1 = dataGen.generateUpdates(newCommitTime1,
5000);
+ List<HoodieRecord> updates2 = dataGen.generateUpdates(newCommitTime2,
5000);
+
+ JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(updates1, 4);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(updates2, 4);
+
+ runConcurrentAndAssert(writeRecords1, writeRecords2, client1, client2,
SparkRDDWriteClient::upsert, true);
+ }
+
+ private void runConcurrentAndAssert(JavaRDD<HoodieRecord> writeRecords1,
JavaRDD<HoodieRecord> writeRecords2,
+ SparkRDDWriteClient client1,
SparkRDDWriteClient client2,
+ Function3<JavaRDD<WriteStatus>,
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+ boolean assertForConflict) throws
ExecutionException, InterruptedException {
+
+ CountDownLatch runCountDownLatch = new CountDownLatch(2);
+ final ExecutorService executors = Executors.newFixedThreadPool(2);
+ String newCommitTime1 = "003";
+ String newCommitTime2 = "004";
+
+ AtomicBoolean client1Succeeded = new AtomicBoolean(true);
+ AtomicBoolean client2Succeeded = new AtomicBoolean(true);
+
+ Future future1 = executors.submit(() -> {
+ try {
+ ingestBatch(writeFn, client1, newCommitTime1, writeRecords1,
runCountDownLatch);
+ } catch (IOException e) {
+ e.printStackTrace();
Review comment:
will fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]