codope commented on code in PR #8774:
URL: https://github.com/apache/hudi/pull/8774#discussion_r1200730726
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -102,13 +106,104 @@ void testCleaningDeltaCommits() throws Exception {
if (i == 1) {
Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
assertTrue(compactionInstant.isPresent(), "A compaction plan should
be scheduled");
+ compactionCommit = compactionInstant.get();
client.compact(compactionInstant.get());
}
}
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
assertRowNumberEqualsTo(30);
+ // ensure there are no data files matching the compaction commit that
was rolled back.
+ String finalCompactionCommit = compactionCommit;
+ PathFilter filter = (path) ->
path.toString().contains(finalCompactionCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(0,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+ }
+ }
+
+ @Test
+ public void testRestoreWithFileGroupCreatedWithDeltaCommits() throws
IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(true)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+ final int numRecords = 100;
+ String firstCommit;
+ String secondCommit;
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // 1st commit insert
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.insert(writeRecords, newCommitTime);
+ firstCommit = newCommitTime;
+
+ // 2nd commit with inserts and updates which will create new file slice
due to small file handling.
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records2 =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+ List<HoodieRecord> records3 = dataGen.generateInserts(newCommitTime, 30);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc.parallelize(records3, 1);
+
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords2.union(writeRecords3), newCommitTime);
+ secondCommit = newCommitTime;
+ // add savepoint to 2nd commit
+ client.savepoint(firstCommit, "test user","test comment");
Review Comment:
should it be `secondCommit` as per the comment?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -102,13 +106,104 @@ void testCleaningDeltaCommits() throws Exception {
if (i == 1) {
Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
assertTrue(compactionInstant.isPresent(), "A compaction plan should
be scheduled");
+ compactionCommit = compactionInstant.get();
client.compact(compactionInstant.get());
}
}
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
assertRowNumberEqualsTo(30);
+ // ensure there are no data files matching the compaction commit that
was rolled back.
+ String finalCompactionCommit = compactionCommit;
+ PathFilter filter = (path) ->
path.toString().contains(finalCompactionCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(0,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+ }
+ }
+
+ @Test
+ public void testRestoreWithFileGroupCreatedWithDeltaCommits() throws
IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(true)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+ final int numRecords = 100;
+ String firstCommit;
+ String secondCommit;
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // 1st commit insert
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.insert(writeRecords, newCommitTime);
+ firstCommit = newCommitTime;
+
+ // 2nd commit with inserts and updates which will create new file slice
due to small file handling.
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records2 =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+ List<HoodieRecord> records3 = dataGen.generateInserts(newCommitTime, 30);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc.parallelize(records3, 1);
+
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords2.union(writeRecords3), newCommitTime);
+ secondCommit = newCommitTime;
+ // add savepoint to 2nd commit
+ client.savepoint(firstCommit, "test user","test comment");
+ }
+ assertRowNumberEqualsTo(130);
+ // verify there are new base files created matching the 2nd commit
timestamp.
+ PathFilter filter = (path) -> path.toString().contains(secondCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(1,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+
+ // disable small file handling so that updates go to log files.
+ hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit
triggers compaction
+ .withInlineCompaction(true)
+ .compactionSmallFileSize(0)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+
+ // add 2 more updates which will create log files.
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords, newCommitTime);
+
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ records = dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords, newCommitTime);
+ }
+ assertRowNumberEqualsTo(130);
+
+ // restore to 2nd commit.
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ client.restoreToSavepoint(firstCommit);
Review Comment:
same here - restore to `secondCommit`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -117,21 +126,22 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
// If there is no delta commit present after the current commit
(if compaction), no action, else we
// need to make sure that a compaction commit rollback also
deletes any log files written as part of the
// succeeding deltacommit.
- boolean higherDeltaCommits =
+ boolean hasHigherCompletedDeltaCommits =
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
.empty();
- if (higherDeltaCommits) {
- // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ if (hasHigherCompletedDeltaCommits &&
!isCommitMetadataCompleted) {
Review Comment:
How's it possible that there are higher delta commits and compaction has
completed? Is restore running concurrently with ingestion?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java:
##########
@@ -102,13 +106,104 @@ void testCleaningDeltaCommits() throws Exception {
if (i == 1) {
Option<String> compactionInstant =
client.scheduleCompaction(Option.empty());
assertTrue(compactionInstant.isPresent(), "A compaction plan should
be scheduled");
+ compactionCommit = compactionInstant.get();
client.compact(compactionInstant.get());
}
}
// restore
client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
assertRowNumberEqualsTo(30);
+ // ensure there are no data files matching the compaction commit that
was rolled back.
+ String finalCompactionCommit = compactionCommit;
+ PathFilter filter = (path) ->
path.toString().contains(finalCompactionCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(0,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+ }
+ }
+
+ @Test
+ public void testRestoreWithFileGroupCreatedWithDeltaCommits() throws
IOException {
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .withInlineCompaction(true)
+ .build())
+ .withRollbackUsingMarkers(true)
+ .build();
+ final int numRecords = 100;
+ String firstCommit;
+ String secondCommit;
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ // 1st commit insert
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime,
numRecords);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ client.startCommitWithTime(newCommitTime);
+ client.insert(writeRecords, newCommitTime);
+ firstCommit = newCommitTime;
+
+ // 2nd commit with inserts and updates which will create new file slice
due to small file handling.
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records2 =
dataGen.generateUniqueUpdates(newCommitTime, numRecords);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc.parallelize(records2, 1);
+ List<HoodieRecord> records3 = dataGen.generateInserts(newCommitTime, 30);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc.parallelize(records3, 1);
+
+ client.startCommitWithTime(newCommitTime);
+ client.upsert(writeRecords2.union(writeRecords3), newCommitTime);
+ secondCommit = newCommitTime;
+ // add savepoint to 2nd commit
+ client.savepoint(firstCommit, "test user","test comment");
+ }
+ assertRowNumberEqualsTo(130);
+ // verify there are new base files created matching the 2nd commit
timestamp.
+ PathFilter filter = (path) -> path.toString().contains(secondCommit);
+ for (String pPath : dataGen.getPartitionPaths()) {
+ assertEquals(1,
fs.listStatus(FSUtils.getPartitionPath(hoodieWriteConfig.getBasePath(), pPath),
filter).length);
+ }
+
+ // disable small file handling so that updates go to log files.
+ hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER) // eager cleaning
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4) // the 4th delta_commit
triggers compaction
+ .withInlineCompaction(true)
+ .compactionSmallFileSize(0)
+ .build())
+ .withRollbackUsingMarkers(true)
Review Comment:
Shouldn't it be false to test ListingBasedRollback?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]