amogh-jahagirdar commented on code in PR #15126:
URL: https://github.com/apache/iceberg/pull/15126#discussion_r2900313343
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3712,6 +3723,139 @@ protected RESTSessionCatalog newSessionCatalog(
return catalog;
}
+ /**
+ * Test concurrent appends on multiple branches simultaneously to verify
proper handling of
+ * sequence number conflicts.
+ *
+ * <p>Uses a barrier to synchronize threads so they all load the same table
state and commit
+ * simultaneously, creating deterministic conflicts. With retries disabled,
we can verify exact
+ * counts: one success per round, and (numBranches - 1)
CommitFailedExceptions per round.
+ *
+ * <p>This verifies that: 1. Sequence number conflicts are caught by
AssertLastSequenceNumber
+ * requirement 2. Conflicts result in CommitFailedException (retryable) not
ValidationException
+ * (non-retryable) 3. The REST catalog properly handles concurrent
modifications across branches
+ */
+ @Test
+ public void testConcurrentAppendsOnMultipleBranches() {
Review Comment:
I do think though we should have 2 unit tests in TestTableMetadata, 1 which
asserts a RetryableValidationException is thrown in case of row ID assignment
validation failure and the other for a sequence number case. The test in
TestRestCatalog asserts what a real client would see in these cases (a
CommitFailedException).
These are the tests I have for TestTableMetadata:
```
@Test
public void testAddSnapshotWithStaleSequenceNumberIsRetryable() {
TableMetadata base = TableMetadata.newTableMetadata(
TEST_SCHEMA, PartitionSpec.unpartitioned(), "location",
ImmutableMap.of());
// Advance lastSequenceNumber to 1 by adding a root snapshot
Snapshot s1 = new BaseSnapshot(
1, 1L, null, System.currentTimeMillis(), null, null, null,
"file:/s1.avro", null, null, null);
TableMetadata withS1 =
TableMetadata.buildFrom(base).addSnapshot(s1).build();
// A snapshot with seqNum=1 and non-null parentId is stale (1 is not >
lastSequenceNumber=1)
Snapshot staleSnapshot = new BaseSnapshot(
1, 2L, 1L, System.currentTimeMillis(), null, null, null,
"file:/s2.avro", null, null, null);
assertThatThrownBy(() ->
TableMetadata.buildFrom(withS1).addSnapshot(staleSnapshot))
.isInstanceOf(RetryableValidationException.class)
.hasMessageContaining("Cannot add snapshot with sequence
number");
}
@Test
public void testAddSnapshotWithStaleFirstRowIdIsRetryable() {
TableMetadata base = TableMetadata.newTableMetadata(
TEST_SCHEMA, PartitionSpec.unpartitioned(), "location",
ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"));
// Advance nextRowId to 5 by adding a snapshot that allocates 5 rows
Snapshot s1 = new BaseSnapshot(
1, 1L, null, System.currentTimeMillis(), null, null, null,
"file:/s1.avro", 0L, 5L, null);
TableMetadata withS1 =
TableMetadata.buildFrom(base).addSnapshot(s1).build();
// nextRowId is now 5
// A snapshot with firstRowId=0 is stale (0 < nextRowId=5)
Snapshot staleSnapshot = new BaseSnapshot(
2, 2L, 1L, System.currentTimeMillis(), null, null, null,
"file:/s2.avro", 0L, 1L, null);
assertThatThrownBy(() ->
TableMetadata.buildFrom(withS1).addSnapshot(staleSnapshot))
.isInstanceOf(RetryableValidationException.class)
.hasMessageContaining("Cannot add a snapshot, first-row-id is
behind table next-row-id");
}
```
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3712,6 +3723,139 @@ protected RESTSessionCatalog newSessionCatalog(
return catalog;
}
+ /**
+ * Test concurrent appends on multiple branches simultaneously to verify
proper handling of
+ * sequence number conflicts.
+ *
+ * <p>Uses a barrier to synchronize threads so they all load the same table
state and commit
+ * simultaneously, creating deterministic conflicts. With retries disabled,
we can verify exact
+ * counts: one success per round, and (numBranches - 1)
CommitFailedExceptions per round.
+ *
+ * <p>This verifies that: 1. Sequence number conflicts are caught by
AssertLastSequenceNumber
+ * requirement 2. Conflicts result in CommitFailedException (retryable) not
ValidationException
+ * (non-retryable) 3. The REST catalog properly handles concurrent
modifications across branches
+ */
+ @Test
+ public void testConcurrentAppendsOnMultipleBranches() {
Review Comment:
This is my bad, I suggested this test originally because I wanted a more
realistic test but after looking with fresh eyes it is more complex than it's
worth. At it the essence, what we want to verify is that in the scenario of a
conflicting sequence number, that the client gets back a commit failed
exception and not a validation exception. We technically don't need N commits
on different branches and assert how many retries really happened etc to prove
that. The trick is in how do we actually simulate a single concurrent backend
commit to another branch which advances the seq number which we can achieve
through a spy on the adapter (which we do in the other tests anyways) which
invokes the fake concurrent commit on an actual commit. Here is what I have
```
@Test
public void testSequenceNumberConflictThrowsCommitFailed() {
RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
RESTCatalog catalog = catalog(adapter);
catalog.createNamespace(TABLE.namespace());
catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create();
DataFile fileOnMain =
DataFiles.builder(SPEC)
.withPath("/path/commit-test-file1.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id_bucket=0")
.withRecordCount(1)
.build();
catalog.loadTable(TABLE).newFastAppend().appendFile(fileOnMain).commit();
DataFile fileOnAnotherBranch =
DataFiles.builder(SPEC)
.withPath("/path/commit-test-conflicting.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id_bucket=0")
.withRecordCount(1)
.build();
// Before the next commit is processed by the server, advance the
server's lastSequenceNumber
// by committing to a different branch. This simulates a concurrent
request to a different branch
// that "beats" the commit to main.
Mockito.doAnswer(
invocation -> {
backendCatalog
.loadTable(TABLE)
.newFastAppend()
.appendFile(fileOnAnotherBranch)
.toBranch("other")
.commit();
return invocation.callRealMethod();
})
.when(adapter)
.execute(matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)),
any(), any(), any());
DataFile anotherFileOnMain =
DataFiles.builder(SPEC)
.withPath("/path/commit-test-file2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id_bucket=0")
.withRecordCount(1)
.build();
assertThatThrownBy(
() ->
catalog.loadTable(TABLE).newFastAppend().appendFile(anotherFileOnMain).commit())
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Validation failed, please retry");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]