amogh-jahagirdar commented on code in PR #15126:
URL: https://github.com/apache/iceberg/pull/15126#discussion_r2785247722


##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3606,6 +3614,119 @@ protected RESTSessionCatalog newSessionCatalog(
     return catalog;
   }
 
+  /**
+   * Test concurrent appends on multiple branches simultaneously to verify 
proper handling of
+   * sequence number conflicts.
+   *
+   * <p>Uses a barrier to synchronize threads so they all load the same table 
state and commit
+   * simultaneously, creating deterministic conflicts. With retries disabled, 
we can verify exact
+   * counts: one success per round, and (numBranches - 1) 
CommitFailedExceptions per round.

Review Comment:
   It looks like below we don't verify that exactly one success per round (we 
just check at least 1), I think this is due to the awaitility and current 
barrier setup.



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3606,6 +3614,119 @@ protected RESTSessionCatalog newSessionCatalog(
     return catalog;
   }
 
+  /**
+   * Test concurrent appends on multiple branches simultaneously to verify 
proper handling of
+   * sequence number conflicts.
+   *
+   * <p>Uses a barrier to synchronize threads so they all load the same table 
state and commit
+   * simultaneously, creating deterministic conflicts. With retries disabled, 
we can verify exact
+   * counts: one success per round, and (numBranches - 1) 
CommitFailedExceptions per round.

Review Comment:
   ```
     /**
      * Test concurrent appends on multiple branches simultaneously to verify 
proper handling of
      * sequence number conflicts.
      *
      * <p>Uses CyclicBarrier to synchronize threads so they all load the same 
table state and commit
      * simultaneously, creating deterministic conflicts. With retries 
disabled, we can verify exact
      * counts: one success per round, and (numBranches - 1) 
CommitFailedExceptions per round.
      *
      * <p>This verifies that: 1. Sequence number conflicts are caught by 
AssertLastSequenceNumber
      * requirement 2. Conflicts result in CommitFailedException (retryable) 
not ValidationException
      * (non-retryable) 3. The REST catalog properly handles concurrent 
modifications across branches
      */
     @Test
     public void testConcurrentAppendsOnMultipleBranches() {
       int numBranches = 5;
       int numRounds = 10;
   
       RESTCatalog catalog = catalog();
   
       Namespace ns = Namespace.of("concurrent_test");
       TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table");
   
       catalog.createNamespace(ns);
       Table table = catalog.buildTable(tableIdent, 
SCHEMA).withPartitionSpec(SPEC).create();
   
       // Disable retries so we can count exact conflicts
       table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, 
"-1").commit();
   
       // Create branches from the main branch
       String[] branchNames = new String[numBranches];
          ManageSnapshots manageSnapshots = table.manageSnapshots();
       for (int i = 0; i < numBranches; i++) {
         branchNames[i] = "branch-" + i;
         manageSnapshots = manageSnapshots.createBranch(branchNames[i]);
       }
   
                manageSnapshots.commit();
   
       CyclicBarrier startBarrier = new CyclicBarrier(numBranches);
       CyclicBarrier endBarrier = new CyclicBarrier(numBranches);
       AtomicIntegerArray successPerRound = new AtomicIntegerArray(numRounds);
       AtomicIntegerArray failuresPerRound = new AtomicIntegerArray(numRounds);
       AtomicInteger validationFailureCount = new AtomicInteger(0);
   
       ExecutorService executor =
           MoreExecutors.getExitingExecutorService(
               (ThreadPoolExecutor) Executors.newFixedThreadPool(numBranches));
   
       Tasks.range(numBranches)
           .stopOnFailure()
           .throwFailureWhenFinished()
           .executeWith(executor)
           .run(
               branchIdx -> {
                 String branchName = branchNames[branchIdx];
   
                 for (int round = 0; round < numRounds; round++) {
                   // Load table
                   Table localTable = catalog.loadTable(tableIdent);
   
                   DataFile newFile =
                       DataFiles.builder(SPEC)
                           .withPath(
                               
String.format("/path/to/branch-%d-round-%d.parquet", branchIdx, round))
                           .withFileSizeInBytes(15)
                           .withPartitionPath(String.format("id_bucket=%d", 
branchIdx % 16))
                           .withRecordCount(3)
                           .build();
   
                   try {
                     // Wait for all threads to be ready to commit
                     startBarrier.await();
   
                     // All threads commit simultaneously
                     
localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit();
                     successPerRound.incrementAndGet(round);
                   } catch (CommitFailedException e) {
                     // Expected for conflicts - this is the correct behavior 
with the fix
                     failuresPerRound.incrementAndGet(round);
                   } catch (ValidationException e) {
                     // This indicates the fix is not working
                     validationFailureCount.incrementAndGet();
                     throw e;
                   } catch (Exception e) {
                     // Handle barrier exceptions
                     throw new RuntimeException(e);
                   }
   
                   try {
                     // Ensure all threads complete this round before starting 
the next
                     endBarrier.await();
                   } catch (Exception e) {
                     throw new RuntimeException(e);
                   }
                 }
               });
   
       executor.shutdown();
   
       int totalAttempts = numBranches * numRounds;
   
       // Verify: no ValidationException should have been thrown
       assertThat(validationFailureCount.get())
           .as(
               "Sequence number conflicts should throw CommitFailedException 
(retryable), "
                   + "not ValidationException")
           .isEqualTo(0);
   
       // Verify per-round exactly one success and (numBranches - 1) failures 
per round
       for (int round = 0; round < numRounds; round++) {
         assertThat(successPerRound.get(round))
             .as("Exactly one commit should succeed in round " + round)
             .isEqualTo(1);
         assertThat(failuresPerRound.get(round))
             .as("Exactly (numBranches - 1) commits should fail in round " + 
round)
             .isEqualTo(numBranches - 1);
       }
   
       // Verify no attempts were lost
       int totalSuccesses = 0;
       int totalFailures = 0;
       for (int round = 0; round < numRounds; round++) {
         totalSuccesses += successPerRound.get(round);
         totalFailures += failuresPerRound.get(round);
       }
   
       assertThat(totalSuccesses)
           .as("Total successes should equal number of rounds")
           .isEqualTo(numRounds);
       assertThat(totalSuccesses + totalFailures)
           .as("All commit attempts should either succeed or fail with 
CommitFailedException")
           .isEqualTo(totalAttempts);
     }
   
   ```
   
   This is what I have locally. I changed it a bit so we can guarantee that 
there's exactly one, and some minor improvements so we don't do a commit on 
every branch creation (we can just do it in a single commit).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to