xushiyan commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1195279932
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -759,20 +759,37 @@ protected List<String>
getInstantsToRollback(HoodieTableMetaClient metaClient, H
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
- return inflightInstantsStream.filter(instant -> {
- try {
- return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
- } catch (IOException io) {
- throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
- }
- }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ return getInstantsToRollbackForLazyCleanPolicy(metaClient,
inflightInstantsStream);
} else if (cleaningPolicy.isNever()) {
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Invalid Failed Writes Cleaning
Policy " + config.getFailedWritesCleanPolicy());
}
}
+ protected List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
+
Stream<HoodieInstant> inflightInstantsStream) {
+ // Get expired instants, must store them into list before double-checking
+ List<String> expiredInstants = inflightInstantsStream.filter(instant -> {
+ try {
+ // An instant transformed from inflight to completed have no heartbeat
file and will be detected as expired instant here
+ return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+ } catch (IOException io) {
+ throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
+ }
+ }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ if (!expiredInstants.isEmpty()) {
+ LOG.info("Found expired instants to rollback for lazy clean: " +
String.join(",", expiredInstants));
+ // Only return instants that haven't been completed by other writers
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline latestInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
Review Comment:
"refreshed" is more accurate
```suggestion
HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -759,20 +759,37 @@ protected List<String>
getInstantsToRollback(HoodieTableMetaClient metaClient, H
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
- return inflightInstantsStream.filter(instant -> {
- try {
- return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
- } catch (IOException io) {
- throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
- }
- }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ return getInstantsToRollbackForLazyCleanPolicy(metaClient,
inflightInstantsStream);
} else if (cleaningPolicy.isNever()) {
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Invalid Failed Writes Cleaning
Policy " + config.getFailedWritesCleanPolicy());
}
}
+ protected List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
Review Comment:
`protected` implies potentially this can be overridden by subclasses, while
in fact this is not meant for overriding. We should avoid this access modifier
misuse. If you want to test it, use visiblefortesing and make it package private
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -545,6 +546,120 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
assertTrue(validInstants.containsAll(completedInstants));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ",
"COPY_ON_WRITE"})
+ public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType
tableType) throws Exception {
+ // create inserts X 1
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ setUpMORTestTable();
+ }
+ // Disabling embedded timeline server, it doesn't work with multiwriter
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withAsyncClean(true)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ // Timeline-server-based markers are not used for multi-writer
tests
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+ FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ // Set the config so that heartbeat will expire in 1 second
without update
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+
.build()).withAutoCommit(false).withProperties(lockProperties);
+ Set<String> validInstants = new HashSet<>();
+ // Create the first commit with inserts
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ createCommitWithInserts(cfg, client, "000", "001", 200, true);
+ validInstants.add("001");
+
+ // Three clients running actions in parallel
+ final int threadCount = 3;
+ final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final String commitTime2 = "002";
+ final String commitTime3 = "003";
+ AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
+ AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
+
+ Future future1 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus1.set(createCommitWithInserts(cfg, client1, "001",
commitTime2, numRecords, false));
+ });
+ });
+ Future future2 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus2.set(createCommitWithInserts(cfg, client2, "001",
commitTime3, numRecords, false));
+ });
+ });
+
+ future1.get();
+ future2.get();
+
+ final CountDownLatch commitCountDownLatch = new CountDownLatch(2);
+ HoodieTableMetaClient tableMetaClient =
client.getTableServiceClient().createMetaClient(true);
+
+ // Get inflight instant stream before commit
+ List<HoodieInstant> inflightInstants = client
+ .getTableServiceClient()
+ .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient)
+ .getReverseOrderedInstants()
+ .collect(Collectors.toList());
+
+ // Commit the instants and get instants to rollback in parallel
+ future1 = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ LOG.info(String.format("Start to commit instant %s", commitTime2));
+ client1.commit(commitTime2, writeStatus1.get());
+ commitCountDownLatch.countDown();
+ validInstants.add(commitTime2);
+ LOG.info(String.format("commit the instant cost %d ms",
System.currentTimeMillis() - start));
+ });
+
+ future2 = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ LOG.info(String.format("Start to commit instant %s", commitTime3));
+ client2.commit(commitTime3, writeStatus2.get());
+ commitCountDownLatch.countDown();
+ validInstants.add(commitTime3);
+ LOG.info(String.format("commit the instant %s cost %d ms", commitTime3,
System.currentTimeMillis() - start));
+ });
+
+ Future future3 = executor.submit(() -> {
+ try {
+ commitCountDownLatch.await(30000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ //
+ }
+ LOG.info("Start to get instants to rollback");
+ List<String> instantsToRollback =
+
client.getTableServiceClient().getInstantsToRollbackForLazyCleanPolicy(tableMetaClient,
inflightInstants.stream());
+ // No instants will be rollback though some instants may be detected as
`expired`
+ assertTrue(instantsToRollback.isEmpty());
Review Comment:
why not make commit2 inflight so we have 1 instant to return here while
having commit3 completed and not included? this will give a better coverage
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -545,6 +546,120 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
assertTrue(validInstants.containsAll(completedInstants));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ",
"COPY_ON_WRITE"})
+ public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType
tableType) throws Exception {
+ // create inserts X 1
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ setUpMORTestTable();
+ }
+ // Disabling embedded timeline server, it doesn't work with multiwriter
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withAsyncClean(true)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ // Timeline-server-based markers are not used for multi-writer
tests
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+ FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ // Set the config so that heartbeat will expire in 1 second
without update
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+
.build()).withAutoCommit(false).withProperties(lockProperties);
+ Set<String> validInstants = new HashSet<>();
+ // Create the first commit with inserts
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ createCommitWithInserts(cfg, client, "000", "001", 200, true);
+ validInstants.add("001");
+
+ // Three clients running actions in parallel
+ final int threadCount = 3;
+ final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final String commitTime2 = "002";
+ final String commitTime3 = "003";
+ AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
+ AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
+
+ Future future1 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus1.set(createCommitWithInserts(cfg, client1, "001",
commitTime2, numRecords, false));
+ });
+ });
+ Future future2 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus2.set(createCommitWithInserts(cfg, client2, "001",
commitTime3, numRecords, false));
+ });
+ });
+
+ future1.get();
+ future2.get();
+
+ final CountDownLatch commitCountDownLatch = new CountDownLatch(2);
+ HoodieTableMetaClient tableMetaClient =
client.getTableServiceClient().createMetaClient(true);
+
+ // Get inflight instant stream before commit
+ List<HoodieInstant> inflightInstants = client
+ .getTableServiceClient()
+ .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient)
+ .getReverseOrderedInstants()
+ .collect(Collectors.toList());
+
+ // Commit the instants and get instants to rollback in parallel
+ future1 = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ LOG.info(String.format("Start to commit instant %s", commitTime2));
+ client1.commit(commitTime2, writeStatus1.get());
+ commitCountDownLatch.countDown();
+ validInstants.add(commitTime2);
+ LOG.info(String.format("commit the instant cost %d ms",
System.currentTimeMillis() - start));
+ });
+
+ future2 = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ LOG.info(String.format("Start to commit instant %s", commitTime3));
+ client2.commit(commitTime3, writeStatus2.get());
+ commitCountDownLatch.countDown();
+ validInstants.add(commitTime3);
+ LOG.info(String.format("commit the instant %s cost %d ms", commitTime3,
System.currentTimeMillis() - start));
+ });
+
+ Future future3 = executor.submit(() -> {
+ try {
+ commitCountDownLatch.await(30000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ //
+ }
+ LOG.info("Start to get instants to rollback");
+ List<String> instantsToRollback =
+
client.getTableServiceClient().getInstantsToRollbackForLazyCleanPolicy(tableMetaClient,
inflightInstants.stream());
+ // No instants will be rollback though some instants may be detected as
`expired`
+ assertTrue(instantsToRollback.isEmpty());
+ });
+
+ future1.get();
+ future2.get();
+ future3.get();
+
+ validInstants.addAll(
+ metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
+
.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));
+ Set<String> completedInstants =
metaClient.reloadActiveTimeline().getCommitsTimeline()
+
.filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toSet());
+ assertTrue(validInstants.containsAll(completedInstants));
Review Comment:
what is the purpose of this assertion here? looks redundant
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -759,20 +759,37 @@ protected List<String>
getInstantsToRollback(HoodieTableMetaClient metaClient, H
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
- return inflightInstantsStream.filter(instant -> {
- try {
- return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
- } catch (IOException io) {
- throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
- }
- }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ return getInstantsToRollbackForLazyCleanPolicy(metaClient,
inflightInstantsStream);
} else if (cleaningPolicy.isNever()) {
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Invalid Failed Writes Cleaning
Policy " + config.getFailedWritesCleanPolicy());
}
}
+ protected List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
+
Stream<HoodieInstant> inflightInstantsStream) {
+ // Get expired instants, must store them into list before double-checking
+ List<String> expiredInstants = inflightInstantsStream.filter(instant -> {
+ try {
+ // An instant transformed from inflight to completed have no heartbeat
file and will be detected as expired instant here
+ return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+ } catch (IOException io) {
+ throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
+ }
+ }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ if (!expiredInstants.isEmpty()) {
+ LOG.info("Found expired instants to rollback for lazy clean: " +
String.join(",", expiredInstants));
Review Comment:
this log is misleading as you may filter it later. it should log the final
result as you want to log info here
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -545,6 +546,120 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
assertTrue(validInstants.containsAll(completedInstants));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ",
"COPY_ON_WRITE"})
+ public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType
tableType) throws Exception {
+ // create inserts X 1
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ setUpMORTestTable();
+ }
+ // Disabling embedded timeline server, it doesn't work with multiwriter
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withAsyncClean(true)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ // Timeline-server-based markers are not used for multi-writer
tests
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+ FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ // Set the config so that heartbeat will expire in 1 second
without update
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+
.build()).withAutoCommit(false).withProperties(lockProperties);
+ Set<String> validInstants = new HashSet<>();
+ // Create the first commit with inserts
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ createCommitWithInserts(cfg, client, "000", "001", 200, true);
+ validInstants.add("001");
+
+ // Three clients running actions in parallel
+ final int threadCount = 3;
+ final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final String commitTime2 = "002";
+ final String commitTime3 = "003";
+ AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
+ AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
+
+ Future future1 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus1.set(createCommitWithInserts(cfg, client1, "001",
commitTime2, numRecords, false));
+ });
+ });
+ Future future2 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus2.set(createCommitWithInserts(cfg, client2, "001",
commitTime3, numRecords, false));
+ });
+ });
+
+ future1.get();
+ future2.get();
+
+ final CountDownLatch commitCountDownLatch = new CountDownLatch(2);
+ HoodieTableMetaClient tableMetaClient =
client.getTableServiceClient().createMetaClient(true);
+
+ // Get inflight instant stream before commit
+ List<HoodieInstant> inflightInstants = client
+ .getTableServiceClient()
+ .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient)
+ .getReverseOrderedInstants()
+ .collect(Collectors.toList());
+
+ // Commit the instants and get instants to rollback in parallel
+ future1 = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ LOG.info(String.format("Start to commit instant %s", commitTime2));
+ client1.commit(commitTime2, writeStatus1.get());
+ commitCountDownLatch.countDown();
+ validInstants.add(commitTime2);
+ LOG.info(String.format("commit the instant cost %d ms",
System.currentTimeMillis() - start));
Review Comment:
this log is noisy
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -545,6 +546,120 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
assertTrue(validInstants.containsAll(completedInstants));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ",
"COPY_ON_WRITE"})
+ public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType
tableType) throws Exception {
+ // create inserts X 1
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ setUpMORTestTable();
+ }
+ // Disabling embedded timeline server, it doesn't work with multiwriter
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withAsyncClean(true)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ // Timeline-server-based markers are not used for multi-writer
tests
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+ FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ // Set the config so that heartbeat will expire in 1 second
without update
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+
.build()).withAutoCommit(false).withProperties(lockProperties);
+ Set<String> validInstants = new HashSet<>();
+ // Create the first commit with inserts
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ createCommitWithInserts(cfg, client, "000", "001", 200, true);
+ validInstants.add("001");
+
+ // Three clients running actions in parallel
+ final int threadCount = 3;
+ final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final String commitTime2 = "002";
+ final String commitTime3 = "003";
+ AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
+ AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
+
+ Future future1 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus1.set(createCommitWithInserts(cfg, client1, "001",
commitTime2, numRecords, false));
+ });
+ });
+ Future future2 = executor.submit(() -> {
+ final int numRecords = 100;
+ assertDoesNotThrow(() -> {
+ writeStatus2.set(createCommitWithInserts(cfg, client2, "001",
commitTime3, numRecords, false));
+ });
+ });
+
+ future1.get();
+ future2.get();
+
+ final CountDownLatch commitCountDownLatch = new CountDownLatch(2);
+ HoodieTableMetaClient tableMetaClient =
client.getTableServiceClient().createMetaClient(true);
+
+ // Get inflight instant stream before commit
+ List<HoodieInstant> inflightInstants = client
+ .getTableServiceClient()
+ .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient)
+ .getReverseOrderedInstants()
+ .collect(Collectors.toList());
+
+ // Commit the instants and get instants to rollback in parallel
+ future1 = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ LOG.info(String.format("Start to commit instant %s", commitTime2));
+ client1.commit(commitTime2, writeStatus1.get());
+ commitCountDownLatch.countDown();
+ validInstants.add(commitTime2);
+ LOG.info(String.format("commit the instant cost %d ms",
System.currentTimeMillis() - start));
+ });
+
+ future2 = executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ LOG.info(String.format("Start to commit instant %s", commitTime3));
+ client2.commit(commitTime3, writeStatus2.get());
+ commitCountDownLatch.countDown();
+ validInstants.add(commitTime3);
+ LOG.info(String.format("commit the instant %s cost %d ms", commitTime3,
System.currentTimeMillis() - start));
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]