agnes-xinyi-lu commented on code in PR #15126:
URL: https://github.com/apache/iceberg/pull/15126#discussion_r2747225959
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -4394,6 +4402,102 @@ private void expectNotModifiedResponseForLoadTable(
any());
}
+ /**
+ * Test concurrent appends on multiple branches simultaneously to verify
proper handling of
+ * sequence number conflicts.
+ *
+ * <p>Creates 5 different branches on the table, then performs 10 parallel
append commits on each
+ * branch at the same time (50 total concurrent operations). This verifies
that: 1. Sequence
+ * number conflicts are caught by AssertLastSequenceNumber requirement 2.
Conflicts result in
+ * CommitFailedException (retryable) not ValidationException (non-retryable)
3. The REST catalog
+ * properly handles concurrent modifications across different branches
+ */
+ @Test
+ public void testConcurrentAppendsOnMultipleBranches() {
+ int numBranches = 5;
+ int commitsPerBranch = 10;
+ int totalConcurrentWrites = numBranches * commitsPerBranch;
+
+ RESTCatalog restCatalog = catalog();
+
+ Namespace ns = Namespace.of("concurrent_test");
+ TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table");
+
+ restCatalog.createNamespace(ns);
+ Table table = restCatalog.buildTable(tableIdent,
SCHEMA).withPartitionSpec(SPEC).create();
+
+ // Add initial data to the main branch
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ // Create 5 branches from the main branch
+ String[] branchNames = new String[numBranches];
+ for (int i = 0; i < numBranches; i++) {
+ branchNames[i] = "branch-" + i;
+ table.manageSnapshots().createBranch(branchNames[i]).commit();
+ }
+
+ // Refresh to get updated metadata with all branches
+ restCatalog.loadTable(tableIdent);
+
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger validationFailureCount = new AtomicInteger(0);
+
+ ExecutorService executor =
+ MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor)
Executors.newFixedThreadPool(totalConcurrentWrites));
+
+ Tasks.range(totalConcurrentWrites)
+ .executeWith(executor)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (taskIndex, exception) -> {
+ // Check if sequence number validation error (indicates fix not
working)
+ if (exception instanceof BadRequestException
+ && exception.getMessage().contains("Cannot add snapshot with
sequence number")) {
+ validationFailureCount.incrementAndGet();
+ } else if (exception instanceof ValidationException) {
+ validationFailureCount.incrementAndGet();
+ }
+ // CommitFailedException is expected - this is the correct
retryable behavior
+ })
+ .run(
+ taskIndex -> {
+ int branchIdx = taskIndex / commitsPerBranch;
+ int commitIdx = taskIndex % commitsPerBranch;
+ String branchName = branchNames[branchIdx];
+
+ // Each thread loads the table independently
+ Table localTable = restCatalog.loadTable(tableIdent);
+
+ // Create a unique file for this commit
+ DataFile newFile =
+ DataFiles.builder(SPEC)
+ .withPath(
+ String.format(
+ "/path/to/branch-%d-commit-%d.parquet",
branchIdx, commitIdx))
+ .withFileSizeInBytes(15)
+ .withPartitionPath(String.format("id_bucket=%d",
branchIdx % 16))
+ .withRecordCount(3)
+ .build();
+
+ // Append to the specific branch
+
localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit();
+
+ successCount.incrementAndGet();
+ });
+
+ // Verify the fix: with AssertLastSequenceNumber, there should be NO
validation failures
+ // All concurrent conflicts should be caught as CommitFailedException
(retryable)
+ assertThat(validationFailureCount.get())
+ .as(
+ "With the fix, sequence number conflicts should be caught by
AssertLastSequenceNumber "
+ + "and throw CommitFailedException (retryable), not
ValidationException")
+ .isEqualTo(0);
+
+ // At least some should succeed (commits that don't conflict or succeed
after retry)
+ assertThat(successCount.get()).as("At least some appends should
succeed").isGreaterThan(0);
Review Comment:
Understand the ask, I updated the test to verify total number of commits and
make sure there are failures/conflicts, for the total number of conflicts it's
hard to get a deterministic number (pls let me know if you have an easy way,
happy to implement that). Main problem is table.appendFiles.commit is not an
atomic operation, it refreshes TableMetadata in SnapshotProducer. To create a
deterministic conflict, we would need to put a barrier there or in
TableOperations to make sure every thread gets exactly the same base
TableMetadata.
I think the purpose of this test is to verify there could be conflicts
during commit when there are multiple threads committing to non-current branch
at the same time. And the fix should guarantee it's a retryable exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]