yihua commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r770134444
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,19 @@
+ "record size estimate compute dynamically based on commit
metadata. "
+ " This is critical in computing the insert parallelism and
bin-packing inserts into small files.");
+ public static final ConfigProperty<String> MAX_ARCHIVE_FILES_TO_KEEP_PROP =
ConfigProperty
+ .key("hoodie.max.archive.files")
+ .defaultValue("10")
+ .withDocumentation("The numbers of kept archive files under archived.");
+
+ public static final ConfigProperty<String> AUTO_TRIM_ARCHIVE_FILES_DROP =
ConfigProperty
+ .key("hoodie.auto.trim.archive.files")
+ .defaultValue("false")
+ .withDocumentation("When enabled, Hoodie will keep the most recent " +
MAX_ARCHIVE_FILES_TO_KEEP_PROP.key()
Review comment:
Let's add a `WARNING` in both configs. sth like : `WARNING: do not use
this config unless you know what you're doing. If enabled, details of older
archived instants are deleted, resulting in information loss in the archived
timeline, which may affect tools like CLI and repair. Only enable this if you
hit severe performance issues for retrieving archived timeline.` (feel free to
add more details)
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,19 @@
+ "record size estimate compute dynamically based on commit
metadata. "
+ " This is critical in computing the insert parallelism and
bin-packing inserts into small files.");
+ public static final ConfigProperty<String> MAX_ARCHIVE_FILES_TO_KEEP_PROP =
ConfigProperty
+ .key("hoodie.max.archive.files")
+ .defaultValue("10")
+ .withDocumentation("The numbers of kept archive files under archived.");
+
+ public static final ConfigProperty<String> AUTO_TRIM_ARCHIVE_FILES_DROP =
ConfigProperty
+ .key("hoodie.auto.trim.archive.files")
+ .defaultValue("false")
+ .withDocumentation("When enabled, Hoodie will keep the most recent " +
MAX_ARCHIVE_FILES_TO_KEEP_PROP.key()
+ + " archive files and delete older one which lose part of archived
instants information.");
Review comment:
Should these configs live in `HoodieWriteConfig` instead of
`HoodieCompactionConfig`?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,19 @@
+ "record size estimate compute dynamically based on commit
metadata. "
+ " This is critical in computing the insert parallelism and
bin-packing inserts into small files.");
+ public static final ConfigProperty<String> MAX_ARCHIVE_FILES_TO_KEEP_PROP =
ConfigProperty
+ .key("hoodie.max.archive.files")
+ .defaultValue("10")
Review comment:
Let's make this `noDefault()` in case it's accidentally invoked?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -140,6 +146,48 @@ public boolean archiveIfRequired(HoodieEngineContext
context) throws IOException
}
}
+ private void trimArchiveFilesIfNecessary(HoodieEngineContext context) throws
IOException {
+ Stream<HoodieLogFile> allLogFiles =
FSUtils.getAllLogFiles(metaClient.getFs(),
+ archiveFilePath.getParent(),
+ archiveFilePath.getName(),
+ HoodieArchivedLogFile.ARCHIVE_EXTENSION,
+ "");
+ List<HoodieLogFile> sortedLogFilesList =
allLogFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList());
+ if (!sortedLogFilesList.isEmpty()) {
+ List<String> skipped =
sortedLogFilesList.stream().skip(maxArchiveFilesToKeep).map(HoodieLogFile::getPath).map(Path::toString).collect(Collectors.toList());
Review comment:
nit: `skipped` -> `archiveFilesToDelete`
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##########
@@ -183,6 +217,41 @@ public void testArchiveTableWithArchival(boolean
enableMetadata) throws Exceptio
}
}
+ @ParameterizedTest
+ @MethodSource("testArchiveTableWithArchivalTrim")
+ public void testArchiveTableWithArchivalCleanUp(boolean enableMetadata,
boolean enableArchiveTrim, int archiveFilesToKeep) throws Exception {
+ HashSet<String> archiveFilesExisted = new HashSet<>();
+ ArrayList<String> currentExistArchiveFiles = new ArrayList<>();
+ HoodieWriteConfig writeConfig =
initTestTableAndGetWriteConfig(enableMetadata, 2, 3, 2, enableArchiveTrim,
archiveFilesToKeep);
+ String archivePath = metaClient.getArchivePath();
+ for (int i = 1; i < 10; i++) {
+ testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i
== 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1",
"p2"), 2);
+ // trigger archival
+ archiveAndGetCommitsList(writeConfig);
+ RemoteIterator<LocatedFileStatus> iter =
metaClient.getFs().listFiles(new Path(archivePath), false);
+ ArrayList<String> files = new ArrayList<>();
+ while (iter.hasNext()) {
+ files.add(iter.next().getPath().toString());
+ }
+ archiveFilesExisted.addAll(files);
+ currentExistArchiveFiles = files;
+ }
+
+ assertEquals(archiveFilesToKeep, currentExistArchiveFiles.size());
+
+ if (enableArchiveTrim) {
+ // sort archive files path
+ List<String> sorted =
archiveFilesExisted.stream().sorted().collect(Collectors.toList());
+ List<String> archiveFilesDeleted = sorted.subList(0, 3 -
archiveFilesToKeep);
+ List<String> archiveFilesKept = sorted.subList(3 - archiveFilesToKeep,
sorted.size());
+
+ // assert older archive files are deleted
+ assertFalse(currentExistArchiveFiles.containsAll(archiveFilesDeleted));
+ // assert most recent archive files are preserved
+ assertTrue(currentExistArchiveFiles.containsAll(archiveFilesKept));
+ }
Review comment:
Add a check when archive trim is disabled as well?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -140,6 +146,48 @@ public boolean archiveIfRequired(HoodieEngineContext
context) throws IOException
}
}
+ private void trimArchiveFilesIfNecessary(HoodieEngineContext context) throws
IOException {
+ Stream<HoodieLogFile> allLogFiles =
FSUtils.getAllLogFiles(metaClient.getFs(),
+ archiveFilePath.getParent(),
+ archiveFilePath.getName(),
+ HoodieArchivedLogFile.ARCHIVE_EXTENSION,
+ "");
+ List<HoodieLogFile> sortedLogFilesList =
allLogFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList());
+ if (!sortedLogFilesList.isEmpty()) {
+ List<String> skipped =
sortedLogFilesList.stream().skip(maxArchiveFilesToKeep).map(HoodieLogFile::getPath).map(Path::toString).collect(Collectors.toList());
+ if (!skipped.isEmpty()) {
+ LOG.info("Deleting archive files : " + skipped);
+ context.setJobStatus(this.getClass().getSimpleName(), "Delete archive
files");
+ Map<String, Boolean> result = deleteFilesParallelize(metaClient,
skipped, context, true);
Review comment:
remove local variable assignment since it's not used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]