nsivabalan commented on code in PR #18828:
URL: https://github.com/apache/hudi/pull/18828#discussion_r3479836848


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,221 @@ public void testRollbackWhenFirstCommitFail() {
       client.rollback(newCommitTime);
     }
   }
+
+  /**
+   * Tests that rollback operations generate unique write tokens for log 
files, preventing collisions
+   * during repeated rollback attempts.
+   *
+   * <p>This test validates the fix for write token generation in metadata 
table rollbacks. Previously,
+   * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"), 
causing collisions when rollback
+   * was retried. Now, each rollback generates explicit write tokens based on 
Spark task context
+   * (format: {partitionId}-{stageId}-{attemptId}).
+   *
+   * <p>Test flow:
+   * <ol>
+   *   <li>Create initial commit with inserts to establish base files</li>
+   *   <li>Create second commit with updates to generate log files (MOR 
table)</li>
+   *   <li>Backup commit timeline files and marker directory for repeated 
rollback simulation</li>
+   *   <li>Execute first rollback and validate write tokens are NOT 
"1-0-1"</li>
+   *   <li>Restore commit state (timeline files + markers) to simulate 
rollback retry scenario</li>
+   *   <li>Execute second rollback and validate unique write tokens prevent 
collisions</li>
+   *   <li>Verify exactly one new rollback log file per file group from second 
attempt</li>
+   * </ol>
+   *
+   * @param enableFileSliceOptimization tests both with and without file slice 
caching optimization
+   *                                    to ensure write tokens work correctly 
in both code paths
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testRollbackWriteTokenGeneration(boolean enableMetadataTable) 
throws Exception {
+    // 1. Setup: Create a table-version-6 MOR table so the rollback exercises 
RollbackHelperV1
+    //    (which is what this test targets). On v8+ rollbacks delete files 
directly and don't
+    //    produce rollback log files.
+    Properties props = new Properties();
+    props.put(HoodieTableConfig.VERSION.key(), 
HoodieTableVersion.SIX.versionCode());
+    tearDown();
+    initPath();
+    initSparkContexts();
+    dataGen = new HoodieTestDataGenerator(
+        new String[] {DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH});
+    initHoodieStorage();
+    initMetaClient(HoodieTableType.MERGE_ON_READ, props);
+
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withRollbackUsingMarkers(true)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withWriteTableVersion(HoodieTableVersion.SIX.versionCode())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0).build())
+        .build();
+
+    HoodieTestDataGenerator.writePartitionMetadataDeprecated(
+        storage, new String[] {DEFAULT_FIRST_PARTITION_PATH}, basePath);
+    FileSystem fs = (FileSystem) storage.getFileSystem();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    // Write 1: Initial inserts
+    String commitTime1 = "001";
+    WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+    List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(commitTime1, 100, 
DEFAULT_FIRST_PARTITION_PATH);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    List<WriteStatus> statusList = client.upsert(writeRecords, 
commitTime1).collect();
+    Assertions.assertNoWriteErrors(statusList);
+    client.commit(commitTime1, jsc.parallelize(statusList));
+
+    // Write 2: Updates to same partition to create log files. Use multiple 
Spark partitions to
+    // exercise multiple task contexts (so write tokens vary across tasks).
+    String commitTime2 = "002";
+    WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+    List<HoodieRecord> updateRecords = dataGen.generateUpdates(commitTime2, 
records);
+    writeRecords = jsc.parallelize(updateRecords, 2);
+    statusList = client.upsert(writeRecords, commitTime2).collect();
+    Assertions.assertNoWriteErrors(statusList);
+    // Intentionally leave commit 002 in inflight state so rollback exercises 
the inflight path.
+
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    Map<String, List<String>> logFileNames = collectLogFileNamesByFileId(fs, 
DEFAULT_FIRST_PARTITION_PATH);
+    assertFalse(logFileNames.isEmpty());
+
+    // Backup commit 002 timeline files + marker dir so the rollback retry 
below can replay the same input.
+    Path commit2RequestedPath = new Path(metaClient.getMetaPath().toString(),
+        commitTime2 + HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
+    Path commit2InflightPath = new Path(metaClient.getMetaPath().toString(),
+        commitTime2 + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+    Path commit2MarkerDir = new 
Path(metaClient.getMarkerFolderPath(commitTime2));
+    Path backupDir = new Path(basePath, ".backup_test");
+    Path backupMarkerDir = new Path(backupDir, commitTime2);
+    fs.mkdirs(backupDir);
+
+    boolean requestedExists = fs.exists(commit2RequestedPath);
+    boolean inflightExists = fs.exists(commit2InflightPath);
+    boolean markerDirExists = fs.exists(commit2MarkerDir);
+
+    if (requestedExists) {
+      FileUtil.copy(fs, commit2RequestedPath, fs,
+          new Path(backupDir, commitTime2 + 
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+          false, fs.getConf());
+    }

Review Comment:
   Good catch — addressed in 854d0206 (now part of 1d3ca3986d40 after rebase). 
`FileSlice.getLogFiles()` is sorted highest-version first via the reverse 
comparator, so index 0 is the rollback log file (matches 
`FileSlice.getLatestLogFile()`'s `findFirst`). Switched to `logFiles.get(0)`; 
the previous `size()-1` referenced the original commit-002 log and would have 
passed even without the fix.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -323,16 +328,30 @@ List<Pair<String, HoodieRollbackStat>> 
maybeDeleteAndCollectStats(HoodieEngineCo
               .withTableVersion(tableVersion)
               .withFileExtension(HoodieLogFile.DELTA_EXTENSION);
 
-          // Supply the pre-computed latest log version and its write token so 
that
-          // WriterBuilder.build() skips the per-request 
FSUtils.getLatestLogVersion() listing.
-          // This produces the same result: build() would discover (N, 
T_existing), construct
-          // path (N, T_existing), find it exists, and roll over to N+1. 
Pre-computation
-          // feeds the same (N, T_existing), triggering the identical rollover 
in getOutputStream().
+          // Apply pre-computed log version if available. Always keep the 
per-task write token
+          // generated above (via CommonClientUtils.generateWriteToken) so 
that retried/repeated
+          // rollbacks do not collide on UNKNOWN_WRITE_TOKEN or inherit a 
prior log's write token.
+          //
+          // When doDelete=true, we actually create a new rollback log file: 
explicitly bump the
+          // version (latest + 1) so the new file is written with the per-task 
write token instead

Review Comment:
   Correct read — when a partition listing fails (IOException caught at line 
153) no sentinels are inserted for any request in that partition, so the 
per-request fallback in `WriterBuilder.build()` kicks in and (for v6) 
re-discovers the existing log's token, overriding the per-task token. That's 
the same exposure this PR fixes for the happy path. Leaving as-is here: the 
listing-failure path is exceptional and matches pre-PR behavior, so it's a 
regression-equivalent corner case rather than a new bug. Worth a follow-up to 
either re-do the per-request listing inline on null and apply the explicit 
version, or to plumb an override into `WriterBuilder.build()` so v6 honors the 
per-task token even on the fallback path. Open to taking that as a separate PR 
if you'd like.



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackHelper.java:
##########
@@ -681,7 +680,9 @@ void 
testPreComputeLogVersionsSentinelForMissingFileGroups() throws Exception {
     String missingKey = RollbackHelperV1.logVersionLookupKey(partition, 
"fileId-no-logs", baseInstant);
     assertTrue(result.containsKey(missingKey));
     assertEquals(HoodieLogFile.LOGFILE_BASE_VERSION, (int) 
result.get(missingKey).getLeft());
-    assertEquals(HoodieLogFormat.UNKNOWN_WRITE_TOKEN, 
result.get(missingKey).getRight());
+    // Sentinel entries (no real log file) carry a null write token so they 
cannot be confused
+    // with a real log file that happens to use UNKNOWN_WRITE_TOKEN.
+    assertEquals(null, result.get(missingKey).getRight());

Review Comment:
   Addressed in 854d0206 — switched to 
`assertNull(result.get(missingKey).getRight())`.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,221 @@ public void testRollbackWhenFirstCommitFail() {
       client.rollback(newCommitTime);
     }
   }
+
+  /**
+   * Tests that rollback operations generate unique write tokens for log 
files, preventing collisions
+   * during repeated rollback attempts.
+   *
+   * <p>This test validates the fix for write token generation in metadata 
table rollbacks. Previously,
+   * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"), 
causing collisions when rollback
+   * was retried. Now, each rollback generates explicit write tokens based on 
Spark task context
+   * (format: {partitionId}-{stageId}-{attemptId}).
+   *
+   * <p>Test flow:
+   * <ol>
+   *   <li>Create initial commit with inserts to establish base files</li>
+   *   <li>Create second commit with updates to generate log files (MOR 
table)</li>
+   *   <li>Backup commit timeline files and marker directory for repeated 
rollback simulation</li>
+   *   <li>Execute first rollback and validate write tokens are NOT 
"1-0-1"</li>
+   *   <li>Restore commit state (timeline files + markers) to simulate 
rollback retry scenario</li>
+   *   <li>Execute second rollback and validate unique write tokens prevent 
collisions</li>
+   *   <li>Verify exactly one new rollback log file per file group from second 
attempt</li>
+   * </ol>

Review Comment:
   Addressed in 854d0206 — `@param enableMetadataTable` with a description that 
matches the actual parameter semantics ("with and without metadata table 
enabled").



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,223 @@ public void testRollbackWhenFirstCommitFail() {
       client.rollback(newCommitTime);
     }
   }
+
+  /**
+   * Tests that rollback operations generate unique write tokens for log 
files, preventing collisions
+   * during repeated rollback attempts.
+   *
+   * <p>This test validates the fix for write token generation in metadata 
table rollbacks. Previously,
+   * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"), 
causing collisions when rollback
+   * was retried. Now, each rollback generates explicit write tokens based on 
Spark task context
+   * (format: {partitionId}-{stageId}-{attemptId}).
+   *
+   * <p>Test flow:
+   * <ol>
+   *   <li>Create initial commit with inserts to establish base files</li>
+   *   <li>Create second commit with updates to generate log files (MOR 
table)</li>
+   *   <li>Backup commit timeline files and marker directory for repeated 
rollback simulation</li>
+   *   <li>Execute first rollback and validate write tokens are NOT 
"1-0-1"</li>
+   *   <li>Restore commit state (timeline files + markers) to simulate 
rollback retry scenario</li>
+   *   <li>Execute second rollback and validate unique write tokens prevent 
collisions</li>
+   *   <li>Verify exactly one new rollback log file per file group from second 
attempt</li>
+   * </ol>
+   *
+   * @param enableMetadataTable runs the test both with and without metadata 
table enabled to
+   *                            ensure write-token generation is correct in 
both code paths
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testRollbackWriteTokenGeneration(boolean enableMetadataTable) 
throws Exception {
+    // 1. Setup: Create a table-version-6 MOR table so the rollback exercises 
RollbackHelperV1
+    //    (which is what this test targets). On v8+ rollbacks delete files 
directly and don't
+    //    produce rollback log files.
+    Properties props = new Properties();
+    props.put(HoodieTableConfig.VERSION.key(), 
HoodieTableVersion.SIX.versionCode());
+    tearDown();
+    initPath();
+    initSparkContexts();
+    dataGen = new HoodieTestDataGenerator(
+        new String[] {DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH});
+    initHoodieStorage();
+    initMetaClient(HoodieTableType.MERGE_ON_READ, props);
+
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withRollbackUsingMarkers(true)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withWriteTableVersion(HoodieTableVersion.SIX.versionCode())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0).build())
+        .build();
+
+    HoodieTestDataGenerator.writePartitionMetadataDeprecated(
+        storage, new String[] {DEFAULT_FIRST_PARTITION_PATH}, basePath);
+    FileSystem fs = (FileSystem) storage.getFileSystem();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    // Write 1: Initial inserts
+    String commitTime1 = "001";
+    WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+    List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(commitTime1, 100, 
DEFAULT_FIRST_PARTITION_PATH);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    List<WriteStatus> statusList = client.upsert(writeRecords, 
commitTime1).collect();
+    Assertions.assertNoWriteErrors(statusList);
+    client.commit(commitTime1, jsc.parallelize(statusList));
+
+    // Write 2: Updates to same partition to create log files. Use multiple 
Spark partitions to
+    // exercise multiple task contexts (so write tokens vary across tasks).
+    String commitTime2 = "002";
+    WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+    List<HoodieRecord> updateRecords = dataGen.generateUpdates(commitTime2, 
records);
+    writeRecords = jsc.parallelize(updateRecords, 2);
+    statusList = client.upsert(writeRecords, commitTime2).collect();
+    Assertions.assertNoWriteErrors(statusList);
+    // Intentionally leave commit 002 in inflight state so rollback exercises 
the inflight path.
+
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    Map<String, List<String>> logFileNames = collectLogFileNamesByFileId(fs, 
DEFAULT_FIRST_PARTITION_PATH);
+    assertFalse(logFileNames.isEmpty());
+
+    // Backup commit 002 timeline files + marker dir so the rollback retry 
below can replay the same input.
+    Path commit2RequestedPath = new Path(metaClient.getMetaPath().toString(),
+        commitTime2 + HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
+    Path commit2InflightPath = new Path(metaClient.getMetaPath().toString(),
+        commitTime2 + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+    Path commit2MarkerDir = new 
Path(metaClient.getMarkerFolderPath(commitTime2));
+    Path backupDir = new Path(basePath, ".backup_test");
+    Path backupMarkerDir = new Path(backupDir, commitTime2);
+    fs.mkdirs(backupDir);
+
+    boolean requestedExists = fs.exists(commit2RequestedPath);
+    boolean inflightExists = fs.exists(commit2InflightPath);
+    boolean markerDirExists = fs.exists(commit2MarkerDir);
+
+    if (requestedExists) {
+      FileUtil.copy(fs, commit2RequestedPath, fs,
+          new Path(backupDir, commitTime2 + 
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+          false, fs.getConf());
+    }
+    if (inflightExists) {
+      FileUtil.copy(fs, commit2InflightPath, fs,
+          new Path(backupDir, commitTime2 + 
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION),
+          false, fs.getConf());
+    }
+    if (markerDirExists) {
+      FileUtil.copy(fs, commit2MarkerDir, fs, backupMarkerDir, false, 
fs.getConf());
+    }
+
+    // 3. Rollback commit 002
+    String rollbackTime = "003";
+    HoodieInstant rollBackInstant = INSTANT_GENERATOR.createNewInstant(
+        HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, 
commitTime2);
+    BaseRollbackPlanActionExecutor rollbackPlanExecutor = new 
BaseRollbackPlanActionExecutor(
+        context, cfg, table, rollbackTime, rollBackInstant, false, true, 
false);
+    rollbackPlanExecutor.execute().get();
+
+    MergeOnReadRollbackActionExecutor rollbackExecutor = new 
MergeOnReadRollbackActionExecutor(
+        context, cfg, table, rollbackTime, rollBackInstant, true, false);
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
rollbackExecutor.execute().getPartitionMetadata();
+
+    assertEquals(1, rollbackMetadata.size());
+    HoodieRollbackPartitionMetadata partitionMetadata = 
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH);
+    assertFalse(partitionMetadata.getRollbackLogFiles().isEmpty(), "Should 
have rollback log files");
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = this.getHoodieTable(metaClient, cfg);
+
+    // Validate write tokens on the rollback log files are per-task generated 
(not UNKNOWN_WRITE_TOKEN "1-0-1").
+    List<FileSlice> rollbackFileSlices = table.getSliceView()
+        .getLatestFileSlices(DEFAULT_FIRST_PARTITION_PATH)
+        .collect(Collectors.toList());
+    // FileSlice.getLogFiles() is sorted highest-version first (reverse 
comparator),
+    // so index 0 is the latest log file produced by the rollback.
+    List<HoodieLogFile> rollbackLogFiles = rollbackFileSlices.stream()
+        .flatMap(slice -> {
+          List<HoodieLogFile> logFiles = 
slice.getLogFiles().collect(Collectors.toList());
+          return Collections.singleton(logFiles.get(0)).stream();
+        })
+        .collect(Collectors.toList());
+
+    assertTrue(rollbackLogFiles.size() > 0, "Should have rollback log files 
with rollback instant time");
+    for (HoodieLogFile logFile : rollbackLogFiles) {
+      String writeToken = logFile.getLogWriteToken();
+      assertFalse(writeToken.isEmpty(), "Write token should not be empty");
+      assertTrue(writeToken.matches("\\d+-\\d+-\\d+"),
+          String.format("Write token should match pattern 
partitionId-stageId-attemptId, but got: %s in file: %s",
+              writeToken, logFile.getFileName()));
+      assertNotEquals("1-0-1", writeToken);
+    }
+
+    Map<String, List<String>> logFileNamesPostRollback = 
collectLogFileNamesByFileId(fs, DEFAULT_FIRST_PARTITION_PATH);
+
+    // Simulate rollback retry: remove rollback timeline files and restore 
commit 002 timeline + markers.
+    HoodieInstant lastRollbackInstant = 
metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
+    String latestRollbackCompletedFileName =
+        INSTANT_FILE_NAME_GENERATOR.getFileName(lastRollbackInstant);
+    fs.delete(new Path(metaClient.getMetaPath().toString(), 
latestRollbackCompletedFileName), false);
+    fs.delete(new Path(metaClient.getMetaPath().toString(),
+        rollbackTime + HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION), false);
+
+    if (requestedExists) {
+      FileUtil.copy(fs, new Path(backupDir, commitTime2 + 
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+          fs, commit2RequestedPath, false, fs.getConf());
+    }
+    if (inflightExists) {
+      FileUtil.copy(fs, new Path(backupDir, commitTime2 + 
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION),
+          fs, commit2InflightPath, false, fs.getConf());
+    }
+    if (markerDirExists) {
+      FileUtil.copy(fs, backupMarkerDir, fs, commit2MarkerDir.getParent(), 
false, fs.getConf());
+    }
+    fs.delete(backupDir, true);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = this.getHoodieTable(metaClient, cfg);
+
+    // Trigger second rollback - should create additional rollback log files 
with different write tokens.
+    MergeOnReadRollbackActionExecutor rollbackExecutor2 = new 
MergeOnReadRollbackActionExecutor(
+        context, cfg, table, rollbackTime, rollBackInstant, true, false);
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata2 = 
rollbackExecutor2.execute().getPartitionMetadata();
+
+    assertEquals(1, rollbackMetadata2.size());
+    HoodieRollbackPartitionMetadata partitionMetadata2 = 
rollbackMetadata2.get(DEFAULT_FIRST_PARTITION_PATH);
+    assertFalse(partitionMetadata2.getRollbackLogFiles().isEmpty(), "Should 
have rollback log files");
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    Map<String, List<String>> logFileNamesPost2ndRollback = 
collectLogFileNamesByFileId(fs, DEFAULT_FIRST_PARTITION_PATH);
+    Map<String, Integer> filesFrom2ndRollback = new HashMap<>();
+    logFileNamesPost2ndRollback.forEach((fileId, fileNames) -> {
+      List<String> previousFiles = 
logFileNamesPostRollback.getOrDefault(fileId, Collections.emptyList());
+      for (String fileName : fileNames) {
+        if (!previousFiles.contains(fileName)) {
+          filesFrom2ndRollback.merge(fileId, 1, Integer::sum);
+          assertNotEquals("1-0-1", new 
HoodieLogFile(fileName).getLogWriteToken());
+        }
+      }
+    });
+
+    assertFalse(filesFrom2ndRollback.isEmpty(),
+        "Second rollback should produce at least one new log file (no 
collision with first rollback)");
+    assertEquals(logFileNames.size(), filesFrom2ndRollback.size());
+    filesFrom2ndRollback.forEach((k, v) -> assertEquals(1, v));
+    client.close();
+  }
+
+  /**
+   * Lists all log files in the given partition and groups their file names by 
file ID.
+   */
+  private Map<String, List<String>> collectLogFileNamesByFileId(FileSystem fs, 
String partitionPath) throws IOException {
+    Map<String, List<String>> logFilesByFileId = new HashMap<>();
+    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(
+        new Path(metaClient.getBasePath().toString() + "/" + partitionPath), 
false);
+    while (itr.hasNext()) {
+      FileStatus fileStatus = itr.next();
+      String fileName = fileStatus.getPath().getName();
+      if (fileName.contains("log")) {
+        String fileId = FSUtils.getFileId(fileName);

Review Comment:
   Addressed in 1d3ca3986d40 — switched to `FSUtils.isLogFile(fileName)`.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,223 @@ public void testRollbackWhenFirstCommitFail() {
       client.rollback(newCommitTime);
     }
   }
+
+  /**
+   * Tests that rollback operations generate unique write tokens for log 
files, preventing collisions
+   * during repeated rollback attempts.
+   *
+   * <p>This test validates the fix for write token generation in metadata 
table rollbacks. Previously,
+   * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"), 
causing collisions when rollback
+   * was retried. Now, each rollback generates explicit write tokens based on 
Spark task context
+   * (format: {partitionId}-{stageId}-{attemptId}).
+   *
+   * <p>Test flow:
+   * <ol>
+   *   <li>Create initial commit with inserts to establish base files</li>
+   *   <li>Create second commit with updates to generate log files (MOR 
table)</li>
+   *   <li>Backup commit timeline files and marker directory for repeated 
rollback simulation</li>
+   *   <li>Execute first rollback and validate write tokens are NOT 
"1-0-1"</li>
+   *   <li>Restore commit state (timeline files + markers) to simulate 
rollback retry scenario</li>
+   *   <li>Execute second rollback and validate unique write tokens prevent 
collisions</li>
+   *   <li>Verify exactly one new rollback log file per file group from second 
attempt</li>
+   * </ol>
+   *
+   * @param enableMetadataTable runs the test both with and without metadata 
table enabled to
+   *                            ensure write-token generation is correct in 
both code paths
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testRollbackWriteTokenGeneration(boolean enableMetadataTable) 
throws Exception {
+    // 1. Setup: Create a table-version-6 MOR table so the rollback exercises 
RollbackHelperV1
+    //    (which is what this test targets). On v8+ rollbacks delete files 
directly and don't
+    //    produce rollback log files.
+    Properties props = new Properties();
+    props.put(HoodieTableConfig.VERSION.key(), 
HoodieTableVersion.SIX.versionCode());
+    tearDown();
+    initPath();
+    initSparkContexts();
+    dataGen = new HoodieTestDataGenerator(
+        new String[] {DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH});
+    initHoodieStorage();
+    initMetaClient(HoodieTableType.MERGE_ON_READ, props);
+
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withRollbackUsingMarkers(true)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withWriteTableVersion(HoodieTableVersion.SIX.versionCode())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0).build())
+        .build();
+
+    HoodieTestDataGenerator.writePartitionMetadataDeprecated(
+        storage, new String[] {DEFAULT_FIRST_PARTITION_PATH}, basePath);
+    FileSystem fs = (FileSystem) storage.getFileSystem();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+    // Write 1: Initial inserts
+    String commitTime1 = "001";
+    WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+    List<HoodieRecord> records = 
dataGen.generateInsertsForPartition(commitTime1, 100, 
DEFAULT_FIRST_PARTITION_PATH);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    List<WriteStatus> statusList = client.upsert(writeRecords, 
commitTime1).collect();
+    Assertions.assertNoWriteErrors(statusList);
+    client.commit(commitTime1, jsc.parallelize(statusList));
+
+    // Write 2: Updates to same partition to create log files. Use multiple 
Spark partitions to
+    // exercise multiple task contexts (so write tokens vary across tasks).
+    String commitTime2 = "002";
+    WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+    List<HoodieRecord> updateRecords = dataGen.generateUpdates(commitTime2, 
records);
+    writeRecords = jsc.parallelize(updateRecords, 2);
+    statusList = client.upsert(writeRecords, commitTime2).collect();
+    Assertions.assertNoWriteErrors(statusList);
+    // Intentionally leave commit 002 in inflight state so rollback exercises 
the inflight path.
+
+    HoodieTable table = this.getHoodieTable(metaClient, cfg);
+    Map<String, List<String>> logFileNames = collectLogFileNamesByFileId(fs, 
DEFAULT_FIRST_PARTITION_PATH);
+    assertFalse(logFileNames.isEmpty());
+
+    // Backup commit 002 timeline files + marker dir so the rollback retry 
below can replay the same input.
+    Path commit2RequestedPath = new Path(metaClient.getMetaPath().toString(),
+        commitTime2 + HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
+    Path commit2InflightPath = new Path(metaClient.getMetaPath().toString(),
+        commitTime2 + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+    Path commit2MarkerDir = new 
Path(metaClient.getMarkerFolderPath(commitTime2));
+    Path backupDir = new Path(basePath, ".backup_test");
+    Path backupMarkerDir = new Path(backupDir, commitTime2);
+    fs.mkdirs(backupDir);
+
+    boolean requestedExists = fs.exists(commit2RequestedPath);
+    boolean inflightExists = fs.exists(commit2InflightPath);
+    boolean markerDirExists = fs.exists(commit2MarkerDir);
+
+    if (requestedExists) {
+      FileUtil.copy(fs, commit2RequestedPath, fs,
+          new Path(backupDir, commitTime2 + 
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+          false, fs.getConf());
+    }
+    if (inflightExists) {
+      FileUtil.copy(fs, commit2InflightPath, fs,
+          new Path(backupDir, commitTime2 + 
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION),
+          false, fs.getConf());
+    }
+    if (markerDirExists) {
+      FileUtil.copy(fs, commit2MarkerDir, fs, backupMarkerDir, false, 
fs.getConf());
+    }
+
+    // 3. Rollback commit 002
+    String rollbackTime = "003";
+    HoodieInstant rollBackInstant = INSTANT_GENERATOR.createNewInstant(
+        HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, 
commitTime2);
+    BaseRollbackPlanActionExecutor rollbackPlanExecutor = new 
BaseRollbackPlanActionExecutor(
+        context, cfg, table, rollbackTime, rollBackInstant, false, true, 
false);
+    rollbackPlanExecutor.execute().get();
+
+    MergeOnReadRollbackActionExecutor rollbackExecutor = new 
MergeOnReadRollbackActionExecutor(
+        context, cfg, table, rollbackTime, rollBackInstant, true, false);
+    Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = 
rollbackExecutor.execute().getPartitionMetadata();
+
+    assertEquals(1, rollbackMetadata.size());
+    HoodieRollbackPartitionMetadata partitionMetadata = 
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH);
+    assertFalse(partitionMetadata.getRollbackLogFiles().isEmpty(), "Should 
have rollback log files");
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = this.getHoodieTable(metaClient, cfg);
+
+    // Validate write tokens on the rollback log files are per-task generated 
(not UNKNOWN_WRITE_TOKEN "1-0-1").
+    List<FileSlice> rollbackFileSlices = table.getSliceView()
+        .getLatestFileSlices(DEFAULT_FIRST_PARTITION_PATH)
+        .collect(Collectors.toList());
+    // FileSlice.getLogFiles() is sorted highest-version first (reverse 
comparator),
+    // so index 0 is the latest log file produced by the rollback.
+    List<HoodieLogFile> rollbackLogFiles = rollbackFileSlices.stream()
+        .flatMap(slice -> {
+          List<HoodieLogFile> logFiles = 
slice.getLogFiles().collect(Collectors.toList());
+          return Collections.singleton(logFiles.get(0)).stream();
+        })
+        .collect(Collectors.toList());
+
+    assertTrue(rollbackLogFiles.size() > 0, "Should have rollback log files 
with rollback instant time");
+    for (HoodieLogFile logFile : rollbackLogFiles) {

Review Comment:
   Addressed in 1d3ca3986d40 — switched to 
`assertFalse(rollbackLogFiles.isEmpty(), ...)` for consistency with the 
surrounding assertion on `rollbackMetadata2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to