hudi-agent commented on code in PR #18828:
URL: https://github.com/apache/hudi/pull/18828#discussion_r3430995061
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,223 @@ public void testRollbackWhenFirstCommitFail() {
client.rollback(newCommitTime);
}
}
+
+ /**
+ * Tests that rollback operations generate unique write tokens for log
files, preventing collisions
+ * during repeated rollback attempts.
+ *
+ * <p>This test validates the fix for write token generation in metadata
table rollbacks. Previously,
+ * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"),
causing collisions when rollback
+ * was retried. Now, each rollback generates explicit write tokens based on
Spark task context
+ * (format: {partitionId}-{stageId}-{attemptId}).
+ *
+ * <p>Test flow:
+ * <ol>
+ * <li>Create initial commit with inserts to establish base files</li>
+ * <li>Create second commit with updates to generate log files (MOR
table)</li>
+ * <li>Backup commit timeline files and marker directory for repeated
rollback simulation</li>
+ * <li>Execute first rollback and validate write tokens are NOT
"1-0-1"</li>
+ * <li>Restore commit state (timeline files + markers) to simulate
rollback retry scenario</li>
+ * <li>Execute second rollback and validate unique write tokens prevent
collisions</li>
+ * <li>Verify exactly one new rollback log file per file group from second
attempt</li>
+ * </ol>
+ *
+ * @param enableMetadataTable runs the test both with and without metadata
table enabled to
+ * ensure write-token generation is correct in
both code paths
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testRollbackWriteTokenGeneration(boolean enableMetadataTable)
throws Exception {
+ // 1. Setup: Create a table-version-6 MOR table so the rollback exercises
RollbackHelperV1
+ // (which is what this test targets). On v8+ rollbacks delete files
directly and don't
+ // produce rollback log files.
+ Properties props = new Properties();
+ props.put(HoodieTableConfig.VERSION.key(),
HoodieTableVersion.SIX.versionCode());
+ tearDown();
+ initPath();
+ initSparkContexts();
+ dataGen = new HoodieTestDataGenerator(
+ new String[] {DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH});
+ initHoodieStorage();
+ initMetaClient(HoodieTableType.MERGE_ON_READ, props);
+
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withRollbackUsingMarkers(true)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withWriteTableVersion(HoodieTableVersion.SIX.versionCode())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0).build())
+ .build();
+
+ HoodieTestDataGenerator.writePartitionMetadataDeprecated(
+ storage, new String[] {DEFAULT_FIRST_PARTITION_PATH}, basePath);
+ FileSystem fs = (FileSystem) storage.getFileSystem();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ // Write 1: Initial inserts
+ String commitTime1 = "001";
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<HoodieRecord> records =
dataGen.generateInsertsForPartition(commitTime1, 100,
DEFAULT_FIRST_PARTITION_PATH);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ List<WriteStatus> statusList = client.upsert(writeRecords,
commitTime1).collect();
+ Assertions.assertNoWriteErrors(statusList);
+ client.commit(commitTime1, jsc.parallelize(statusList));
+
+ // Write 2: Updates to same partition to create log files. Use multiple
Spark partitions to
+ // exercise multiple task contexts (so write tokens vary across tasks).
+ String commitTime2 = "002";
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ List<HoodieRecord> updateRecords = dataGen.generateUpdates(commitTime2,
records);
+ writeRecords = jsc.parallelize(updateRecords, 2);
+ statusList = client.upsert(writeRecords, commitTime2).collect();
+ Assertions.assertNoWriteErrors(statusList);
+ // Intentionally leave commit 002 in inflight state so rollback exercises
the inflight path.
+
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ Map<String, List<String>> logFileNames = collectLogFileNamesByFileId(fs,
DEFAULT_FIRST_PARTITION_PATH);
+ assertFalse(logFileNames.isEmpty());
+
+ // Backup commit 002 timeline files + marker dir so the rollback retry
below can replay the same input.
+ Path commit2RequestedPath = new Path(metaClient.getMetaPath().toString(),
+ commitTime2 + HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
+ Path commit2InflightPath = new Path(metaClient.getMetaPath().toString(),
+ commitTime2 + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+ Path commit2MarkerDir = new
Path(metaClient.getMarkerFolderPath(commitTime2));
+ Path backupDir = new Path(basePath, ".backup_test");
+ Path backupMarkerDir = new Path(backupDir, commitTime2);
+ fs.mkdirs(backupDir);
+
+ boolean requestedExists = fs.exists(commit2RequestedPath);
+ boolean inflightExists = fs.exists(commit2InflightPath);
+ boolean markerDirExists = fs.exists(commit2MarkerDir);
+
+ if (requestedExists) {
+ FileUtil.copy(fs, commit2RequestedPath, fs,
+ new Path(backupDir, commitTime2 +
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+ false, fs.getConf());
+ }
+ if (inflightExists) {
+ FileUtil.copy(fs, commit2InflightPath, fs,
+ new Path(backupDir, commitTime2 +
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION),
+ false, fs.getConf());
+ }
+ if (markerDirExists) {
+ FileUtil.copy(fs, commit2MarkerDir, fs, backupMarkerDir, false,
fs.getConf());
+ }
+
+ // 3. Rollback commit 002
+ String rollbackTime = "003";
+ HoodieInstant rollBackInstant = INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
commitTime2);
+ BaseRollbackPlanActionExecutor rollbackPlanExecutor = new
BaseRollbackPlanActionExecutor(
+ context, cfg, table, rollbackTime, rollBackInstant, false, true,
false);
+ rollbackPlanExecutor.execute().get();
+
+ MergeOnReadRollbackActionExecutor rollbackExecutor = new
MergeOnReadRollbackActionExecutor(
+ context, cfg, table, rollbackTime, rollBackInstant, true, false);
+ Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
rollbackExecutor.execute().getPartitionMetadata();
+
+ assertEquals(1, rollbackMetadata.size());
+ HoodieRollbackPartitionMetadata partitionMetadata =
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH);
+ assertFalse(partitionMetadata.getRollbackLogFiles().isEmpty(), "Should
have rollback log files");
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = this.getHoodieTable(metaClient, cfg);
+
+ // Validate write tokens on the rollback log files are per-task generated
(not UNKNOWN_WRITE_TOKEN "1-0-1").
+ List<FileSlice> rollbackFileSlices = table.getSliceView()
+ .getLatestFileSlices(DEFAULT_FIRST_PARTITION_PATH)
+ .collect(Collectors.toList());
+ // FileSlice.getLogFiles() is sorted highest-version first (reverse
comparator),
+ // so index 0 is the latest log file produced by the rollback.
+ List<HoodieLogFile> rollbackLogFiles = rollbackFileSlices.stream()
+ .flatMap(slice -> {
+ List<HoodieLogFile> logFiles =
slice.getLogFiles().collect(Collectors.toList());
+ return Collections.singleton(logFiles.get(0)).stream();
+ })
+ .collect(Collectors.toList());
+
+ assertTrue(rollbackLogFiles.size() > 0, "Should have rollback log files
with rollback instant time");
+ for (HoodieLogFile logFile : rollbackLogFiles) {
+ String writeToken = logFile.getLogWriteToken();
+ assertFalse(writeToken.isEmpty(), "Write token should not be empty");
+ assertTrue(writeToken.matches("\\d+-\\d+-\\d+"),
+ String.format("Write token should match pattern
partitionId-stageId-attemptId, but got: %s in file: %s",
+ writeToken, logFile.getFileName()));
+ assertNotEquals("1-0-1", writeToken);
+ }
+
+ Map<String, List<String>> logFileNamesPostRollback =
collectLogFileNamesByFileId(fs, DEFAULT_FIRST_PARTITION_PATH);
+
+ // Simulate rollback retry: remove rollback timeline files and restore
commit 002 timeline + markers.
+ HoodieInstant lastRollbackInstant =
metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
+ String latestRollbackCompletedFileName =
+ INSTANT_FILE_NAME_GENERATOR.getFileName(lastRollbackInstant);
+ fs.delete(new Path(metaClient.getMetaPath().toString(),
latestRollbackCompletedFileName), false);
+ fs.delete(new Path(metaClient.getMetaPath().toString(),
+ rollbackTime + HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION), false);
+
+ if (requestedExists) {
+ FileUtil.copy(fs, new Path(backupDir, commitTime2 +
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+ fs, commit2RequestedPath, false, fs.getConf());
+ }
+ if (inflightExists) {
+ FileUtil.copy(fs, new Path(backupDir, commitTime2 +
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION),
+ fs, commit2InflightPath, false, fs.getConf());
+ }
+ if (markerDirExists) {
+ FileUtil.copy(fs, backupMarkerDir, fs, commit2MarkerDir.getParent(),
false, fs.getConf());
+ }
+ fs.delete(backupDir, true);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = this.getHoodieTable(metaClient, cfg);
+
+ // Trigger second rollback - should create additional rollback log files
with different write tokens.
+ MergeOnReadRollbackActionExecutor rollbackExecutor2 = new
MergeOnReadRollbackActionExecutor(
+ context, cfg, table, rollbackTime, rollBackInstant, true, false);
+ Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata2 =
rollbackExecutor2.execute().getPartitionMetadata();
+
+ assertEquals(1, rollbackMetadata2.size());
+ HoodieRollbackPartitionMetadata partitionMetadata2 =
rollbackMetadata2.get(DEFAULT_FIRST_PARTITION_PATH);
+ assertFalse(partitionMetadata2.getRollbackLogFiles().isEmpty(), "Should
have rollback log files");
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ Map<String, List<String>> logFileNamesPost2ndRollback =
collectLogFileNamesByFileId(fs, DEFAULT_FIRST_PARTITION_PATH);
+ Map<String, Integer> filesFrom2ndRollback = new HashMap<>();
+ logFileNamesPost2ndRollback.forEach((fileId, fileNames) -> {
+ List<String> previousFiles =
logFileNamesPostRollback.getOrDefault(fileId, Collections.emptyList());
+ for (String fileName : fileNames) {
+ if (!previousFiles.contains(fileName)) {
+ filesFrom2ndRollback.merge(fileId, 1, Integer::sum);
+ assertNotEquals("1-0-1", new
HoodieLogFile(fileName).getLogWriteToken());
+ }
+ }
+ });
+
+ assertFalse(filesFrom2ndRollback.isEmpty(),
+ "Second rollback should produce at least one new log file (no
collision with first rollback)");
+ assertEquals(logFileNames.size(), filesFrom2ndRollback.size());
+ filesFrom2ndRollback.forEach((k, v) -> assertEquals(1, v));
+ client.close();
+ }
+
+ /**
+ * Lists all log files in the given partition and groups their file names by
file ID.
+ */
+ private Map<String, List<String>> collectLogFileNamesByFileId(FileSystem fs,
String partitionPath) throws IOException {
+ Map<String, List<String>> logFilesByFileId = new HashMap<>();
+ RemoteIterator<LocatedFileStatus> itr = fs.listFiles(
+ new Path(metaClient.getBasePath().toString() + "/" + partitionPath),
false);
+ while (itr.hasNext()) {
+ FileStatus fileStatus = itr.next();
+ String fileName = fileStatus.getPath().getName();
+ if (fileName.contains("log")) {
+ String fileId = FSUtils.getFileId(fileName);
Review Comment:
🤖 nit: `fileName.contains("log")` is a loose substring match that could
inadvertently catch non-log files whose names happen to contain the string
"log" (e.g. a base file in a `catalog_logs/` style layout). Could you use
`FSUtils.isLogFile(fileName)` instead for a more precise filter?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -487,4 +503,223 @@ public void testRollbackWhenFirstCommitFail() {
client.rollback(newCommitTime);
}
}
+
+ /**
+ * Tests that rollback operations generate unique write tokens for log
files, preventing collisions
+ * during repeated rollback attempts.
+ *
+ * <p>This test validates the fix for write token generation in metadata
table rollbacks. Previously,
+ * rollback log files used the default UNKNOWN_WRITE_TOKEN ("1-0-1"),
causing collisions when rollback
+ * was retried. Now, each rollback generates explicit write tokens based on
Spark task context
+ * (format: {partitionId}-{stageId}-{attemptId}).
+ *
+ * <p>Test flow:
+ * <ol>
+ * <li>Create initial commit with inserts to establish base files</li>
+ * <li>Create second commit with updates to generate log files (MOR
table)</li>
+ * <li>Backup commit timeline files and marker directory for repeated
rollback simulation</li>
+ * <li>Execute first rollback and validate write tokens are NOT
"1-0-1"</li>
+ * <li>Restore commit state (timeline files + markers) to simulate
rollback retry scenario</li>
+ * <li>Execute second rollback and validate unique write tokens prevent
collisions</li>
+ * <li>Verify exactly one new rollback log file per file group from second
attempt</li>
+ * </ol>
+ *
+ * @param enableMetadataTable runs the test both with and without metadata
table enabled to
+ * ensure write-token generation is correct in
both code paths
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testRollbackWriteTokenGeneration(boolean enableMetadataTable)
throws Exception {
+ // 1. Setup: Create a table-version-6 MOR table so the rollback exercises
RollbackHelperV1
+ // (which is what this test targets). On v8+ rollbacks delete files
directly and don't
+ // produce rollback log files.
+ Properties props = new Properties();
+ props.put(HoodieTableConfig.VERSION.key(),
HoodieTableVersion.SIX.versionCode());
+ tearDown();
+ initPath();
+ initSparkContexts();
+ dataGen = new HoodieTestDataGenerator(
+ new String[] {DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH});
+ initHoodieStorage();
+ initMetaClient(HoodieTableType.MERGE_ON_READ, props);
+
+ HoodieWriteConfig cfg = getConfigBuilder()
+ .withRollbackUsingMarkers(true)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withWriteTableVersion(HoodieTableVersion.SIX.versionCode())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0).build())
+ .build();
+
+ HoodieTestDataGenerator.writePartitionMetadataDeprecated(
+ storage, new String[] {DEFAULT_FIRST_PARTITION_PATH}, basePath);
+ FileSystem fs = (FileSystem) storage.getFileSystem();
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ // Write 1: Initial inserts
+ String commitTime1 = "001";
+ WriteClientTestUtils.startCommitWithTime(client, commitTime1);
+ List<HoodieRecord> records =
dataGen.generateInsertsForPartition(commitTime1, 100,
DEFAULT_FIRST_PARTITION_PATH);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ List<WriteStatus> statusList = client.upsert(writeRecords,
commitTime1).collect();
+ Assertions.assertNoWriteErrors(statusList);
+ client.commit(commitTime1, jsc.parallelize(statusList));
+
+ // Write 2: Updates to same partition to create log files. Use multiple
Spark partitions to
+ // exercise multiple task contexts (so write tokens vary across tasks).
+ String commitTime2 = "002";
+ WriteClientTestUtils.startCommitWithTime(client, commitTime2);
+ List<HoodieRecord> updateRecords = dataGen.generateUpdates(commitTime2,
records);
+ writeRecords = jsc.parallelize(updateRecords, 2);
+ statusList = client.upsert(writeRecords, commitTime2).collect();
+ Assertions.assertNoWriteErrors(statusList);
+ // Intentionally leave commit 002 in inflight state so rollback exercises
the inflight path.
+
+ HoodieTable table = this.getHoodieTable(metaClient, cfg);
+ Map<String, List<String>> logFileNames = collectLogFileNamesByFileId(fs,
DEFAULT_FIRST_PARTITION_PATH);
+ assertFalse(logFileNames.isEmpty());
+
+ // Backup commit 002 timeline files + marker dir so the rollback retry
below can replay the same input.
+ Path commit2RequestedPath = new Path(metaClient.getMetaPath().toString(),
+ commitTime2 + HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
+ Path commit2InflightPath = new Path(metaClient.getMetaPath().toString(),
+ commitTime2 + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
+ Path commit2MarkerDir = new
Path(metaClient.getMarkerFolderPath(commitTime2));
+ Path backupDir = new Path(basePath, ".backup_test");
+ Path backupMarkerDir = new Path(backupDir, commitTime2);
+ fs.mkdirs(backupDir);
+
+ boolean requestedExists = fs.exists(commit2RequestedPath);
+ boolean inflightExists = fs.exists(commit2InflightPath);
+ boolean markerDirExists = fs.exists(commit2MarkerDir);
+
+ if (requestedExists) {
+ FileUtil.copy(fs, commit2RequestedPath, fs,
+ new Path(backupDir, commitTime2 +
HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION),
+ false, fs.getConf());
+ }
+ if (inflightExists) {
+ FileUtil.copy(fs, commit2InflightPath, fs,
+ new Path(backupDir, commitTime2 +
HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION),
+ false, fs.getConf());
+ }
+ if (markerDirExists) {
+ FileUtil.copy(fs, commit2MarkerDir, fs, backupMarkerDir, false,
fs.getConf());
+ }
+
+ // 3. Rollback commit 002
+ String rollbackTime = "003";
+ HoodieInstant rollBackInstant = INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
commitTime2);
+ BaseRollbackPlanActionExecutor rollbackPlanExecutor = new
BaseRollbackPlanActionExecutor(
+ context, cfg, table, rollbackTime, rollBackInstant, false, true,
false);
+ rollbackPlanExecutor.execute().get();
+
+ MergeOnReadRollbackActionExecutor rollbackExecutor = new
MergeOnReadRollbackActionExecutor(
+ context, cfg, table, rollbackTime, rollBackInstant, true, false);
+ Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
rollbackExecutor.execute().getPartitionMetadata();
+
+ assertEquals(1, rollbackMetadata.size());
+ HoodieRollbackPartitionMetadata partitionMetadata =
rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH);
+ assertFalse(partitionMetadata.getRollbackLogFiles().isEmpty(), "Should
have rollback log files");
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = this.getHoodieTable(metaClient, cfg);
+
+ // Validate write tokens on the rollback log files are per-task generated
(not UNKNOWN_WRITE_TOKEN "1-0-1").
+ List<FileSlice> rollbackFileSlices = table.getSliceView()
+ .getLatestFileSlices(DEFAULT_FIRST_PARTITION_PATH)
+ .collect(Collectors.toList());
+ // FileSlice.getLogFiles() is sorted highest-version first (reverse
comparator),
+ // so index 0 is the latest log file produced by the rollback.
+ List<HoodieLogFile> rollbackLogFiles = rollbackFileSlices.stream()
+ .flatMap(slice -> {
+ List<HoodieLogFile> logFiles =
slice.getLogFiles().collect(Collectors.toList());
+ return Collections.singleton(logFiles.get(0)).stream();
+ })
+ .collect(Collectors.toList());
+
+ assertTrue(rollbackLogFiles.size() > 0, "Should have rollback log files
with rollback instant time");
+ for (HoodieLogFile logFile : rollbackLogFiles) {
Review Comment:
🤖 nit: `assertTrue(rollbackLogFiles.size() > 0, ...)` — the same test
already uses `assertFalse(...isEmpty(), ...)` at line 625. Could you make this
consistent and use `assertFalse(rollbackLogFiles.isEmpty(), ...)` here too?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]