amogh-jahagirdar commented on code in PR #15126:
URL: https://github.com/apache/iceberg/pull/15126#discussion_r2785247722
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3606,6 +3614,119 @@ protected RESTSessionCatalog newSessionCatalog(
return catalog;
}
+ /**
+ * Test concurrent appends on multiple branches simultaneously to verify
proper handling of
+ * sequence number conflicts.
+ *
+ * <p>Uses a barrier to synchronize threads so they all load the same table
state and commit
+ * simultaneously, creating deterministic conflicts. With retries disabled,
we can verify exact
+ * counts: one success per round, and (numBranches - 1)
CommitFailedExceptions per round.
Review Comment:
It looks like below we don't verify that exactly one success per round (we
just check at least 1), I think this is due to the awaitility and current
barrier setup.
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3606,6 +3614,119 @@ protected RESTSessionCatalog newSessionCatalog(
return catalog;
}
+ /**
+ * Test concurrent appends on multiple branches simultaneously to verify
proper handling of
+ * sequence number conflicts.
+ *
+ * <p>Uses a barrier to synchronize threads so they all load the same table
state and commit
+ * simultaneously, creating deterministic conflicts. With retries disabled,
we can verify exact
+ * counts: one success per round, and (numBranches - 1)
CommitFailedExceptions per round.
Review Comment:
```
/**
* Test concurrent appends on multiple branches simultaneously to verify
proper handling of
* sequence number conflicts.
*
* <p>Uses CyclicBarrier to synchronize threads so they all load the same
table state and commit
* simultaneously, creating deterministic conflicts. With retries
disabled, we can verify exact
* counts: one success per round, and (numBranches - 1)
CommitFailedExceptions per round.
*
* <p>This verifies that: 1. Sequence number conflicts are caught by
AssertLastSequenceNumber
* requirement 2. Conflicts result in CommitFailedException (retryable)
not ValidationException
* (non-retryable) 3. The REST catalog properly handles concurrent
modifications across branches
*/
@Test
public void testConcurrentAppendsOnMultipleBranches() {
int numBranches = 5;
int numRounds = 10;
RESTCatalog catalog = catalog();
Namespace ns = Namespace.of("concurrent_test");
TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table");
catalog.createNamespace(ns);
Table table = catalog.buildTable(tableIdent,
SCHEMA).withPartitionSpec(SPEC).create();
// Disable retries so we can count exact conflicts
table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES,
"-1").commit();
// Create branches from the main branch
String[] branchNames = new String[numBranches];
ManageSnapshots manageSnapshots = table.manageSnapshots();
for (int i = 0; i < numBranches; i++) {
branchNames[i] = "branch-" + i;
manageSnapshots = manageSnapshots.createBranch(branchNames[i]);
}
manageSnapshots.commit();
CyclicBarrier startBarrier = new CyclicBarrier(numBranches);
CyclicBarrier endBarrier = new CyclicBarrier(numBranches);
AtomicIntegerArray successPerRound = new AtomicIntegerArray(numRounds);
AtomicIntegerArray failuresPerRound = new AtomicIntegerArray(numRounds);
AtomicInteger validationFailureCount = new AtomicInteger(0);
ExecutorService executor =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(numBranches));
Tasks.range(numBranches)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executor)
.run(
branchIdx -> {
String branchName = branchNames[branchIdx];
for (int round = 0; round < numRounds; round++) {
// Load table
Table localTable = catalog.loadTable(tableIdent);
DataFile newFile =
DataFiles.builder(SPEC)
.withPath(
String.format("/path/to/branch-%d-round-%d.parquet", branchIdx, round))
.withFileSizeInBytes(15)
.withPartitionPath(String.format("id_bucket=%d",
branchIdx % 16))
.withRecordCount(3)
.build();
try {
// Wait for all threads to be ready to commit
startBarrier.await();
// All threads commit simultaneously
localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit();
successPerRound.incrementAndGet(round);
} catch (CommitFailedException e) {
// Expected for conflicts - this is the correct behavior
with the fix
failuresPerRound.incrementAndGet(round);
} catch (ValidationException e) {
// This indicates the fix is not working
validationFailureCount.incrementAndGet();
throw e;
} catch (Exception e) {
// Handle barrier exceptions
throw new RuntimeException(e);
}
try {
// Ensure all threads complete this round before starting
the next
endBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
executor.shutdown();
int totalAttempts = numBranches * numRounds;
// Verify: no ValidationException should have been thrown
assertThat(validationFailureCount.get())
.as(
"Sequence number conflicts should throw CommitFailedException
(retryable), "
+ "not ValidationException")
.isEqualTo(0);
// Verify per-round exactly one success and (numBranches - 1) failures
per round
for (int round = 0; round < numRounds; round++) {
assertThat(successPerRound.get(round))
.as("Exactly one commit should succeed in round " + round)
.isEqualTo(1);
assertThat(failuresPerRound.get(round))
.as("Exactly (numBranches - 1) commits should fail in round " +
round)
.isEqualTo(numBranches - 1);
}
// Verify no attempts were lost
int totalSuccesses = 0;
int totalFailures = 0;
for (int round = 0; round < numRounds; round++) {
totalSuccesses += successPerRound.get(round);
totalFailures += failuresPerRound.get(round);
}
assertThat(totalSuccesses)
.as("Total successes should equal number of rounds")
.isEqualTo(numRounds);
assertThat(totalSuccesses + totalFailures)
.as("All commit attempts should either succeed or fail with
CommitFailedException")
.isEqualTo(totalAttempts);
}
```
This is what I have locally. I changed it a bit so we can guarantee that
there's exactly one, and some minor improvements so we don't do a commit on
every branch creation (we can just do it in a single commit).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]