This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 866348adb39e83e7908bc0bfa6d6c1a9dc7f2a89 Author: Sagar Sumit <[email protected]> AuthorDate: Fri Mar 8 23:47:41 2024 +0530 [ENG-6316] Bump cleaner retention for MDT (#537) (#10655) --- .../hudi/metadata/HoodieMetadataWriteUtils.java | 28 +++++++--- .../metadata/TestHoodieMetadataWriteUtils.java | 64 ++++++++++++++++++++++ 2 files changed, 84 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index 76fffd5d0df..48cfb46b49f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -82,6 +82,25 @@ public class HoodieMetadataWriteUtils { String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX; final long maxLogFileSizeBytes = writeConfig.getMetadataConfig().getMaxLogFileSize(); + // Borrow the cleaner policy from the main table and adjust the cleaner policy based on the main table's cleaner policy + HoodieCleaningPolicy dataTableCleaningPolicy = writeConfig.getCleanerPolicy(); + HoodieCleanConfig.Builder cleanConfigBuilder = HoodieCleanConfig.newBuilder() + .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN) + .withAutoClean(false) + .withCleanerParallelism(MDT_DEFAULT_PARALLELISM) + .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy) + .withCleanerPolicy(dataTableCleaningPolicy); + + if (HoodieCleaningPolicy.KEEP_LATEST_COMMITS.equals(dataTableCleaningPolicy)) { + int retainCommits = (int) Math.max(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED, writeConfig.getCleanerCommitsRetained() * 1.2); + cleanConfigBuilder.retainCommits(retainCommits); + } else if (HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.equals(dataTableCleaningPolicy)) { + int retainFileVersions = (int) Math.ceil(writeConfig.getCleanerFileVersionsRetained() * 1.2); + cleanConfigBuilder.retainFileVersions(retainFileVersions); + } else if (HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.equals(dataTableCleaningPolicy)) { + int numHoursRetained = (int) Math.ceil(writeConfig.getCleanerHoursRetained() * 1.2); + cleanConfigBuilder.cleanerNumHoursRetained(numHoursRetained); + } // Create the write config for the metadata table by borrowing options from the main write config. HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() @@ -105,14 +124,7 @@ public class HoodieMetadataWriteUtils { .withSchema(HoodieMetadataRecord.getClassSchema().toString()) .forTable(tableName) // we will trigger cleaning manually, to control the instant times - .withCleanConfig(HoodieCleanConfig.newBuilder() - .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN) - .withAutoClean(false) - .withCleanerParallelism(MDT_DEFAULT_PARALLELISM) - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) - .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy) - .retainCommits(DEFAULT_METADATA_CLEANER_COMMITS_RETAINED) - .build()) + .withCleanConfig(cleanConfigBuilder.build()) // we will trigger archive manually, to ensure only regular writer invokes it .withArchivalConfig(HoodieArchivalConfig.newBuilder() .archiveCommitsWith( diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java new file mode 100644 index 00000000000..529d2ddfc7f --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class TestHoodieMetadataWriteUtils { + + @Test + public void testCreateMetadataWriteConfigForCleaner() { + HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .build(); + + HoodieWriteConfig metadataWriteConfig1 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1, HoodieFailedWritesCleaningPolicy.EAGER); + assertEquals(HoodieFailedWritesCleaningPolicy.EAGER, metadataWriteConfig1.getFailedWritesCleanPolicy()); + assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, metadataWriteConfig1.getCleanerPolicy()); + // default value already greater than data cleaner commits retained * 1.2 + assertEquals(HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED, metadataWriteConfig1.getCleanerCommitsRetained()); + + assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, metadataWriteConfig1.getCleanerPolicy()); + assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS, metadataWriteConfig1.getCleanerPolicy()); + + HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(20).build()) + .build(); + HoodieWriteConfig metadataWriteConfig2 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig2, HoodieFailedWritesCleaningPolicy.EAGER); + assertEquals(HoodieFailedWritesCleaningPolicy.EAGER, metadataWriteConfig2.getFailedWritesCleanPolicy()); + assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, metadataWriteConfig2.getCleanerPolicy()); + // data cleaner commits retained * 1.2 is greater than default + assertEquals(24, metadataWriteConfig2.getCleanerCommitsRetained()); + } +}
