This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 65866c4 [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and
Metadata table be compatible (#2422)
65866c4 is described below
commit 65866c45ec04820b01ab701e7de5cf6a406d2a8e
Author: vinoth chandar <[email protected]>
AuthorDate: Sat Jan 9 16:53:34 2021 -0800
[HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be
compatible (#2422)
* [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table
be compatible
* Use filesystemview and json format from metadata. Add tests
Co-authored-by: Satish Kotha <[email protected]>
---
.../hudi/table/HoodieTimelineArchiveLog.java | 4 +-
.../action/clean/BaseCleanActionExecutor.java | 4 +-
.../hudi/table/action/clean/CleanPlanner.java | 77 ++++++++++----
.../hudi/table/action/rollback/RollbackUtils.java | 1 +
.../hudi/metadata/TestHoodieBackedMetadata.java | 11 ++
.../java/org/apache/hudi/table/TestCleaner.java | 112 ++++++++++++++++++++-
.../table/timeline/TimelineMetadataUtils.java | 7 +-
.../table/view/AbstractTableFileSystemView.java | 20 ++++
.../table/view/PriorityBasedFileSystemView.java | 10 ++
.../view/RemoteHoodieTableFileSystemView.java | 30 ++++++
.../common/table/view/TableFileSystemView.java | 12 ++-
.../apache/hudi/common/util/ClusteringUtils.java | 2 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 7 ++
.../table/view/TestHoodieTableFileSystemView.java | 7 ++
.../hudi/common/testutils/HoodieTestTable.java | 5 +
.../timeline/service/FileSystemViewHandler.java | 15 +++
.../service/handlers/FileSliceHandler.java | 10 ++
17 files changed, 301 insertions(+), 33 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 50967b1..3f4c271 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -290,10 +290,10 @@ public class HoodieTimelineArchiveLog<T extends
HoodieAvroPayload, I, K, O> {
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
+ // TODO HUDI-1518 Cleaner now takes care of removing replaced file
groups. This call to deleteReplacedFileGroups can be removed.
boolean deleteSuccess = deleteReplacedFileGroups(context,
hoodieInstant);
if (!deleteSuccess) {
- // throw error and stop archival if deleting replaced file groups
failed.
- throw new HoodieCommitException("Unable to delete file(s) for " +
hoodieInstant.getFileName());
+ LOG.warn("Unable to delete file(s) for " +
hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
}
try {
deleteAnyLeftOverMarkerFiles(context, hoodieInstant);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
index 18e638e..786bf3e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
@@ -21,9 +21,9 @@ package org.apache.hudi.table.action.clean;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -72,7 +72,7 @@ public abstract class BaseCleanActionExecutor<T extends
HoodieRecordPayload, I,
List<String> partitionsToClean =
planner.getPartitionPathsToClean(earliestInstant);
if (partitionsToClean.isEmpty()) {
- LOG.info("Nothing to clean here. It is already clean");
+ LOG.info("Nothing to clean here.");
return
HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ",
with policy " + config.getCleanerPolicy());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 31d433d..321f248 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -111,14 +112,14 @@ public class CleanPlanner<T extends HoodieRecordPayload,
I, K, O> implements Ser
/**
* Returns list of partitions where clean operations needs to be performed.
*
- * @param newInstantToRetain New instant to be retained after this cleanup
operation
+ * @param earliestRetainedInstant New instant to be retained after this
cleanup operation
* @return list of partitions to scan for cleaning
* @throws IOException when underlying file-system throws this exception
*/
- public List<String> getPartitionPathsToClean(Option<HoodieInstant>
newInstantToRetain) throws IOException {
+ public List<String> getPartitionPathsToClean(Option<HoodieInstant>
earliestRetainedInstant) throws IOException {
switch (config.getCleanerPolicy()) {
case KEEP_LATEST_COMMITS:
- return getPartitionPathsForCleanByCommits(newInstantToRetain);
+ return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
case KEEP_LATEST_FILE_VERSIONS:
return getPartitionPathsForFullCleaning();
default:
@@ -168,10 +169,16 @@ public class CleanPlanner<T extends HoodieRecordPayload,
I, K, O> implements Ser
cleanMetadata.getEarliestCommitToRetain()) &&
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN,
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
- HoodieCommitMetadata.class);
- return
commitMetadata.getPartitionToWriteStats().keySet().stream();
+ if
(HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ return
Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(),
replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
+ } else {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+ HoodieCommitMetadata.class);
+ return
commitMetadata.getPartitionToWriteStats().keySet().stream();
+ }
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
@@ -196,13 +203,17 @@ public class CleanPlanner<T extends HoodieRecordPayload,
I, K, O> implements Ser
private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String
partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " +
config.getCleanerFileVersionsRetained()
+ " file versions. ");
- List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
+ // In this scenario, we will assume that once replaced a file group
automatically becomes eligible for cleaning completely
+ // In other words, the file versions only apply to the active file groups.
+ deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, Option.empty()));
+
+ List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
@@ -226,18 +237,7 @@ public class CleanPlanner<T extends HoodieRecordPayload,
I, K, O> implements Ser
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
- if (nextSlice.getBaseFile().isPresent()) {
- HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
- deletePaths.add(new CleanFileInfo(dataFile.getPath(), false));
- if (dataFile.getBootstrapBaseFile().isPresent() &&
config.shouldCleanBootstrapBaseFile()) {
- deletePaths.add(new
CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
- }
- }
- if (hoodieTable.getMetaClient().getTableType() ==
HoodieTableType.MERGE_ON_READ) {
- // If merge on read, then clean the log files for the commits as well
- deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new
CleanFileInfo(lf.getPath().toString(), false))
- .collect(Collectors.toList()));
- }
+ deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
}
return deletePaths;
@@ -269,7 +269,11 @@ public class CleanPlanner<T extends HoodieRecordPayload,
I, K, O> implements Ser
// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) {
- HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
+ Option<HoodieInstant> earliestCommitToRetainOption =
getEarliestCommitToRetain();
+ HoodieInstant earliestCommitToRetain =
earliestCommitToRetainOption.get();
+ // all replaced file groups before earliestCommitToRetain are eligible
to clean
+ deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles,
partitionPath, earliestCommitToRetainOption));
+ // add active files
List<HoodieFileGroup> fileGroups =
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList =
fileGroup.getAllFileSlices().collect(Collectors.toList());
@@ -322,6 +326,20 @@ public class CleanPlanner<T extends HoodieRecordPayload,
I, K, O> implements Ser
}
return deletePaths;
}
+
+ private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String>
savepointedFiles, String partitionPath, Option<HoodieInstant>
earliestCommitToRetain) {
+ final Stream<HoodieFileGroup> replacedGroups;
+ if (earliestCommitToRetain.isPresent()) {
+ replacedGroups =
fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
partitionPath);
+ } else {
+ replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
+ }
+ return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
+ // do not delete savepointed files (archival will make sure
corresponding replacecommit file is not deleted)
+ .filter(slice -> !slice.getBaseFile().isPresent() ||
!savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
+ .flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
+ .collect(Collectors.toList());
+ }
/**
* Gets the latest version < instantTime. This version file could still be
used by queries.
@@ -339,6 +357,23 @@ public class CleanPlanner<T extends HoodieRecordPayload,
I, K, O> implements Ser
return null;
}
+ private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
+ List<CleanFileInfo> cleanPaths = new ArrayList<>();
+ if (nextSlice.getBaseFile().isPresent()) {
+ HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
+ cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false));
+ if (dataFile.getBootstrapBaseFile().isPresent() &&
config.shouldCleanBootstrapBaseFile()) {
+ cleanPaths.add(new
CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
+ }
+ }
+ if (hoodieTable.getMetaClient().getTableType() ==
HoodieTableType.MERGE_ON_READ) {
+ // If merge on read, then clean the log files for the commits as well
+ cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new
CleanFileInfo(lf.getPath().toString(), false))
+ .collect(Collectors.toList()));
+ }
+ return cleanPaths;
+ }
+
/**
* Returns files to be cleaned for the given partitionPath based on cleaning
policy.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 18f284e..ee7f4dd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -122,6 +122,7 @@ public class RollbackUtils {
List<ListingBasedRollbackRequest> partitionRollbackRequests = new
ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
LOG.info("Rolling back commit action.");
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 32cec71..5932236 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -18,6 +18,7 @@
package org.apache.hudi.metadata;
+import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -498,6 +499,15 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
+
+ // insert overwrite to test replacecommit
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(newCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ records = dataGen.generateInserts(newCommitTime, 5);
+ HoodieWriteResult replaceResult =
client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
+ writeStatuses = replaceResult.getWriteStatuses().collect();
+ assertNoWriteErrors(writeStatuses);
+ assertFalse(metadata(client).isInSync());
}
// Enable metadata table and ensure it is synced
@@ -800,6 +810,7 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups =
tableView.getAllFileGroups(partition).collect(Collectors.toList());
+
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
fileGroups.forEach(g ->
LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b ->
LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 69c6f98..3a5d737 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
@@ -38,9 +40,11 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -57,6 +61,7 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -65,9 +70,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.table.action.clean.CleanPlanner;
import org.apache.hudi.testutils.HoodieClientTestBase;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -76,6 +78,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import scala.Tuple3;
import java.io.File;
import java.io.IOException;
@@ -96,8 +99,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import scala.Tuple3;
-
import static
org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
import static
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
@@ -687,6 +688,107 @@ public class TestCleaner extends HoodieClientTestBase {
assertTrue(testTable.baseFileExists(p0, "002", file1P0));
assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
}
+
+ @Test
+ public void testCleanWithReplaceCommits() throws Exception {
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
+ .build();
+
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String p0 = "2020/01/01";
+ String p1 = "2020/01/02";
+
+ // make 1 commit, with 1 file per partition
+ String file1P0C0 = UUID.randomUUID().toString();
+ String file1P1C0 = UUID.randomUUID().toString();
+ testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0,
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+
+ HoodieCommitMetadata commitMetadata = generateCommitMetadata(
+ Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0));
+ }
+ })
+ );
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"00000000000001"),
+
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
+ assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. logically delete
p0. No change to p1
+ Map<String, String> partitionAndFileId002 =
testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
+ String file2P0C1 = partitionAndFileId002.get(p0);
+ testTable.addReplaceCommit("00000000000002",
generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1));
+
+ // run cleaner
+ List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
+ assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. Replace data in
p1. No change to p0
+ Map<String, String> partitionAndFileId003 =
testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
+ String file3P1C2 = partitionAndFileId003.get(p1);
+ testTable.addReplaceCommit("00000000000003",
generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2));
+
+ // run cleaner
+ List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
+ assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any
partitions and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. Replace data in
p0 again
+ Map<String, String> partitionAndFileId004 =
testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
+ String file4P0C3 = partitionAndFileId004.get(p0);
+ testTable.addReplaceCommit("00000000000004",
generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3));
+
+ // run cleaner
+ List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
+ assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+ assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ //file1P1C0 still stays because its not replaced until 3 and its the only
version available
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. Replace all data
in p1. no new files created
+ Map<String, String> partitionAndFileId005 =
testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
+ String file4P1C4 = partitionAndFileId005.get(p1);
+ testTable.addReplaceCommit("00000000000005",
generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4));
+
+ List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
+ assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+ assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+ }
+
+ private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String
partition, String replacedFileId, String newFileId) {
+ HoodieReplaceCommitMetadata replaceMetadata = new
HoodieReplaceCommitMetadata();
+ replaceMetadata.addReplaceFileId(partition, replacedFileId);
+ replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
+ if (!StringUtils.isNullOrEmpty(newFileId)) {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partition);
+ writeStat.setPath(newFileId);
+ writeStat.setFileId(newFileId);
+ replaceMetadata.addWriteStat(partition, writeStat);
+ }
+ return replaceMetadata;
+ }
@Test
public void testCleanMetadataUpgradeDowngrade() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 32e60c3..962d69d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -158,10 +159,14 @@ public class TimelineMetadataUtils {
return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
}
- public static HoodieRequestedReplaceMetadata
deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException {
+ public static HoodieRequestedReplaceMetadata
deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes,
HoodieRequestedReplaceMetadata.class);
}
+ public static HoodieReplaceCommitMetadata
deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException {
+ return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
+ }
+
public static <T extends SpecificRecordBase> T
deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 65e9231..3f45715 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -62,6 +62,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
@@ -691,6 +692,16 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
@Override
+ public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String
maxCommitTime, String partitionPath) {
+ return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg ->
isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
+ }
+
+ @Override
+ public Stream<HoodieFileGroup> getAllReplacedFileGroups(String
partitionPath) {
+ return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg ->
isFileGroupReplaced(fg.getFileGroupId()));
+ }
+
+ @Override
public final Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupsInPendingClustering() {
try {
readLock.lock();
@@ -1041,6 +1052,15 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
return isFileGroupReplacedBeforeOrOn(fileGroupId,
instants.stream().max(Comparator.naturalOrder()).get());
}
+ private boolean isFileGroupReplacedBefore(HoodieFileGroupId fileGroupId,
String instant) {
+ Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId);
+ if (!hoodieInstantOption.isPresent()) {
+ return false;
+ }
+
+ return HoodieTimeline.compareTimestamps(instant, GREATER_THAN,
hoodieInstantOption.get().getTimestamp());
+ }
+
private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId,
String instant) {
Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId);
if (!hoodieInstantOption.isPresent()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index f7244ee..3783d00 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -200,6 +200,16 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
}
@Override
+ public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String
maxCommitTime, String partitionPath) {
+ return execute(maxCommitTime, partitionPath,
preferredView::getReplacedFileGroupsBefore,
secondaryView::getReplacedFileGroupsBefore);
+ }
+
+ @Override
+ public Stream<HoodieFileGroup> getAllReplacedFileGroups(String
partitionPath) {
+ return execute(partitionPath, preferredView::getAllReplacedFileGroups,
secondaryView::getAllReplacedFileGroups);
+ }
+
+ @Override
public Stream<Pair<String, CompactionOperation>>
getPendingCompactionOperations() {
return execute(preferredView::getPendingCompactionOperations,
secondaryView::getPendingCompactionOperations);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 91a28a8..23b0536 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -91,6 +91,12 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
+ public static final String ALL_REPLACED_FILEGROUPS_BEFORE =
+ String.format("%s/%s", BASE_URL, "filegroups/replaced/before/");
+
+ public static final String ALL_REPLACED_FILEGROUPS_PARTITION =
+ String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/");
+
public static final String PENDING_CLUSTERING_FILEGROUPS =
String.format("%s/%s", BASE_URL, "clustering/pending/");
@@ -380,6 +386,30 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
+ @Override
+ public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String
maxCommitTime, String partitionPath) {
+ Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
+ try {
+ List<FileGroupDTO> fileGroups =
executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
+ new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
+ return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto,
metaClient));
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
+ @Override
+ public Stream<HoodieFileGroup> getAllReplacedFileGroups(String
partitionPath) {
+ Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+ try {
+ List<FileGroupDTO> fileGroups =
executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap,
+ new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
+ return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto,
metaClient));
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
public boolean refresh() {
Map<String, String> paramsMap = getParams();
try {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 504f95a..7330286 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -167,11 +167,21 @@ public interface TableFileSystemView {
HoodieTimeline getTimeline();
/**
- * Stream all the replaced file groups before maxCommitTime.
+ * Stream all the replaced file groups before or on maxCommitTime for given
partition.
*/
Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String
maxCommitTime, String partitionPath);
/**
+ * Stream all the replaced file groups before maxCommitTime for given
partition.
+ */
+ Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime,
String partitionPath);
+
+ /**
+ * Stream all the replaced file groups for given partition.
+ */
+ Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath);
+
+ /**
* Filegroups that are in pending clustering.
*/
Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupsInPendingClustering();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index fcc3274..70dfa2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -86,7 +86,7 @@ public class ClusteringUtils {
LOG.warn("No content found in requested file for instant " +
pendingReplaceInstant);
return Option.empty();
}
- HoodieRequestedReplaceMetadata requestedReplaceMetadata =
TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(content.get());
if
(WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType()))
{
return Option.of(Pair.of(pendingReplaceInstant,
requestedReplaceMetadata.getClusteringPlan()));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 115001a..ed2a878 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -92,6 +93,12 @@ public class HoodieTableMetadataUtil {
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ HoodieReplaceCommitMetadata replaceMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+ timeline.getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ // Note: we only add new files created here. Replaced files are
removed from metadata later by cleaner.
+ records = Option.of(convertMetadataToRecords(replaceMetadata,
instant.getTimestamp()));
+ break;
default:
throw new HoodieException("Unknown type of action " +
instant.getAction());
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 3fceee3..e103427 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -1356,6 +1356,13 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
List<HoodieFileGroup> allReplaced =
fsView.getReplacedFileGroupsBeforeOrOn("2",
partitionPath1).collect(Collectors.toList());
assertEquals(1, allReplaced.size());
assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId());
+
+ allReplaced = fsView.getReplacedFileGroupsBefore("2",
partitionPath1).collect(Collectors.toList());
+ assertEquals(0, allReplaced.size());
+
+ allReplaced =
fsView.getAllReplacedFileGroups(partitionPath1).collect(Collectors.toList());
+ assertEquals(1, allReplaced.size());
+ assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId());
}
@Test
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 3663917..858e113 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -228,6 +228,11 @@ public class HoodieTestTable {
return this;
}
+ public HoodieTestTable forReplaceCommit(String instantTime) {
+ currentInstantTime = instantTime;
+ return this;
+ }
+
public HoodieTestTable forCompaction(String instantTime) {
currentInstantTime = instantTime;
return this;
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
index e008fc5..b3e860a 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
@@ -299,6 +299,21 @@ public class FileSystemViewHandler {
writeValueAsString(ctx, dtos);
}, true));
+ app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE,
new ViewHandler(ctx -> {
+ List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
+
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
+ ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
+ ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
+ writeValueAsString(ctx, dtos);
+ }, true));
+
+ app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION,
new ViewHandler(ctx -> {
+ List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
+
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
+ ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
+ writeValueAsString(ctx, dtos);
+ }, true));
+
app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new
ViewHandler(ctx -> {
List<ClusteringOpDTO> dtos =
sliceHandler.getFileGroupsInPendingClustering(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 18c5eb1..2180e4e 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -94,6 +94,16 @@ public class FileSliceHandler extends Handler {
.collect(Collectors.toList());
}
+ public List<FileGroupDTO> getReplacedFileGroupsBefore(String basePath,
String maxCommitTime, String partitionPath) {
+ return
viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime,
partitionPath).map(FileGroupDTO::fromFileGroup)
+ .collect(Collectors.toList());
+ }
+
+ public List<FileGroupDTO> getAllReplacedFileGroups(String basePath, String
partitionPath) {
+ return
viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup)
+ .collect(Collectors.toList());
+ }
+
public List<ClusteringOpDTO> getFileGroupsInPendingClustering(String
basePath) {
return
viewManager.getFileSystemView(basePath).getFileGroupsInPendingClustering()
.map(fgInstant ->
ClusteringOpDTO.fromClusteringOp(fgInstant.getLeft(), fgInstant.getRight()))