This is an automated email from the ASF dual-hosted git repository. github-bot pushed a commit to branch cherry-pick-eaff8f57-to-branch-1.2 in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 2a5600ebd829ad55e71f37ad6bbbdf1c81e8124d Author: FANNG <[email protected]> AuthorDate: Mon Mar 9 12:34:11 2026 +0900 [#10097] feat(core): support builtin iceberg compaction policy (#10189) ### What changes were proposed in this pull request? This PR adds a built-in Iceberg compaction policy with typed content and aligns policy type handling to enum-based built-in types. 1. Added typed policy content for ICEBERG_COMPACTION (instead of CustomContent): - minDatafileMse - minDeleteFileNumber - rewriteOptions (expanded to job.options.*) 2. Added built-in compaction defaults in policy content: - strategy.type=compaction - job.template-name=builtin-iceberg-rewrite-data-files - built-in trigger/score expressions based on custom-datafile_mse and custom- delete_file_number 3. Added DTO/REST/convertor support for typed content serialization/ deserialization. 4. Added core-side built-in policy content type validation to prevent mismatched content type. 5. Aligned built-in policy type with enum and prefix: - removed obsolete TODO in Policy.BuiltInType - ICEBERG_COMPACTION.policyType() now uses system_iceberg_compaction - REST policy type output is aligned to enum name (ICEBERG_COMPACTION) ### Why are the changes needed? - iceberg-compaction is a core built-in policy and should be strongly typed, validated, and stable, rather than relying on generic CustomContent. - Strong typing improves correctness, validation, and compatibility across API/DTO/core/optimizer integration. - Enum + built-in prefix alignment standardizes policy type semantics and avoids ambiguous type representations. Fix: #10097 ### Does this PR introduce any user-facing change? Yes. 1. A built-in policy type for Iceberg compaction is available with typed content fields: - minDatafileMse - minDeleteFileNumber - rewriteOptions 2. Built-in policy type representation is aligned: - built-in type value: system_iceberg_compaction - REST response policy type for this built-in policy: ICEBERG_COMPACTION 3. Built-in compaction policy now uses a fixed built-in job template: - builtin-iceberg-rewrite-data-files ### How was this patch tested? Added/updated unit tests and ran targeted test suites: - org.apache.gravitino.policy.TestPolicyBuiltInType - org.apache.gravitino.policy.TestPolicyContents - org.apache.gravitino.dto.policy.TestPolicyDTO - org.apache.gravitino.meta.TestPolicyEntity - org.apache.gravitino.server.web.rest.TestPolicyOperations Executed with Gradle (targeted modules/tests), all passed --- .../policy/IcebergDataCompactionContent.java | 280 +++++++++++++++++++++ .../java/org/apache/gravitino/policy/Policy.java | 9 +- .../apache/gravitino/policy/PolicyContents.java | 61 +++++ .../gravitino/policy/TestPolicyBuiltInType.java | 54 ++++ .../gravitino/policy/TestPolicyContents.java | 120 +++++++++ .../gravitino/dto/policy/PolicyContentDTO.java | 130 ++++++++++ .../org/apache/gravitino/dto/policy/PolicyDTO.java | 3 + .../dto/requests/PolicyCreateRequest.java | 3 + .../dto/requests/PolicyUpdateRequest.java | 3 + .../apache/gravitino/dto/util/DTOConverters.java | 26 ++ .../apache/gravitino/dto/policy/TestPolicyDTO.java | 64 +++++ .../org/apache/gravitino/meta/PolicyEntity.java | 6 + .../storage/relational/utils/POConverters.java | 4 +- .../apache/gravitino/meta/TestPolicyEntity.java | 38 +++ docs/iceberg-compaction-policy.md | 134 ++++++++++ docs/manage-policies-in-gravitino.md | 9 +- .../compaction/CompactionStrategyHandler.java | 3 +- .../recommender/strategy/GravitinoStrategy.java | 2 +- .../compaction/CompactionStrategyForTest.java | 2 +- .../compaction/TestCompactionStrategyHandler.java | 2 +- .../TestGravitinoPolicyCompactionStrategy.java | 94 +++++++ .../server/web/rest/PolicyOperations.java | 19 +- .../server/web/rest/TestPolicyOperations.java | 97 +++++++ 23 files changed, 1146 insertions(+), 17 deletions(-) diff --git a/api/src/main/java/org/apache/gravitino/policy/IcebergDataCompactionContent.java b/api/src/main/java/org/apache/gravitino/policy/IcebergDataCompactionContent.java new file mode 100644 index 0000000000..50f76b538f --- /dev/null +++ b/api/src/main/java/org/apache/gravitino/policy/IcebergDataCompactionContent.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.policy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.MetadataObject; + +/** Built-in policy content for Iceberg compaction strategy. */ +public class IcebergDataCompactionContent implements PolicyContent { + /** Property key for strategy type. */ + public static final String STRATEGY_TYPE_KEY = "strategy.type"; + /** Strategy type value for iceberg data compaction. */ + public static final String STRATEGY_TYPE_VALUE = "iceberg-data-compaction"; + /** Property key for job template name. */ + public static final String JOB_TEMPLATE_NAME_KEY = "job.template-name"; + /** Built-in job template name for Iceberg rewrite data files. */ + public static final String JOB_TEMPLATE_NAME_VALUE = "builtin-iceberg-rewrite-data-files"; + /** Prefix for rewrite options propagated to job options. */ + public static final String JOB_OPTIONS_PREFIX = "job.options."; + /** Rule key for trigger expression. */ + public static final String TRIGGER_EXPR_KEY = "trigger-expr"; + /** Rule key for score expression. */ + public static final String SCORE_EXPR_KEY = "score-expr"; + /** Rule key for minimum data file MSE threshold. */ + public static final String MIN_DATA_FILE_MSE_KEY = "minDataFileMse"; + /** Rule key for minimum delete file count threshold. */ + public static final String MIN_DELETE_FILE_NUMBER_KEY = "minDeleteFileNumber"; + /** Rule key for data file MSE score weight. */ + public static final String DATA_FILE_MSE_WEIGHT_KEY = "dataFileMseWeight"; + /** Rule key for delete file number score weight. */ + public static final String DELETE_FILE_NUMBER_WEIGHT_KEY = "deleteFileNumberWeight"; + /** Rule key for max partition number selected for compaction. */ + public static final String MAX_PARTITION_NUM_KEY = "max-partition-num"; + /** Metric name for data file MSE. */ + public static final String DATA_FILE_MSE_METRIC = "custom-data-file-mse"; + /** Metric name for delete file number. */ + public static final String DELETE_FILE_NUMBER_METRIC = "custom-delete-file-number"; + /** Default minimum threshold for data file MSE metric, equals (128 MiB * 0.15)^2. */ + public static final long DEFAULT_MIN_DATA_FILE_MSE = 405323966463344L; + /** Default minimum threshold for delete file number metric. */ + public static final long DEFAULT_MIN_DELETE_FILE_NUMBER = 1L; + /** Default score weight for data file MSE. */ + public static final long DEFAULT_DATA_FILE_MSE_WEIGHT = 1L; + /** Default score weight for delete file number. */ + public static final long DEFAULT_DELETE_FILE_NUMBER_WEIGHT = 100L; + /** Default max partition number for compaction. */ + public static final long DEFAULT_MAX_PARTITION_NUM = 50L; + /** Default rewrite options for Iceberg rewrite data files. */ + public static final Map<String, String> DEFAULT_REWRITE_OPTIONS = ImmutableMap.of(); + + private static final Pattern OPTION_KEY_PATTERN = Pattern.compile("[A-Za-z0-9._-]+"); + private static final Set<MetadataObject.Type> SUPPORTED_OBJECT_TYPES = + ImmutableSet.of( + MetadataObject.Type.CATALOG, MetadataObject.Type.SCHEMA, MetadataObject.Type.TABLE); + private static final String TRIGGER_EXPR = + DATA_FILE_MSE_METRIC + + " >= " + + MIN_DATA_FILE_MSE_KEY + + " || " + + DELETE_FILE_NUMBER_METRIC + + " >= " + + MIN_DELETE_FILE_NUMBER_KEY; + private static final String SCORE_EXPR = + DATA_FILE_MSE_METRIC + + " * " + + DATA_FILE_MSE_WEIGHT_KEY + + " + " + + DELETE_FILE_NUMBER_METRIC + + " * " + + DELETE_FILE_NUMBER_WEIGHT_KEY; + + private final Long minDataFileMse; + private final Long minDeleteFileNumber; + private final Long dataFileMseWeight; + private final Long deleteFileNumberWeight; + private final Long maxPartitionNum; + private final Map<String, String> rewriteOptions; + + /** Default constructor for Jackson deserialization only. */ + private IcebergDataCompactionContent() { + this(null, null, null, null, null, null); + } + + IcebergDataCompactionContent( + Long minDataFileMse, + Long minDeleteFileNumber, + Long dataFileMseWeight, + Long deleteFileNumberWeight, + Long maxPartitionNum, + Map<String, String> rewriteOptions) { + // Nullable inputs are treated as "use default" to simplify policy creation. + this.minDataFileMse = minDataFileMse == null ? DEFAULT_MIN_DATA_FILE_MSE : minDataFileMse; + this.minDeleteFileNumber = + minDeleteFileNumber == null ? DEFAULT_MIN_DELETE_FILE_NUMBER : minDeleteFileNumber; + this.dataFileMseWeight = + dataFileMseWeight == null ? DEFAULT_DATA_FILE_MSE_WEIGHT : dataFileMseWeight; + this.deleteFileNumberWeight = + deleteFileNumberWeight == null ? DEFAULT_DELETE_FILE_NUMBER_WEIGHT : deleteFileNumberWeight; + this.maxPartitionNum = maxPartitionNum == null ? DEFAULT_MAX_PARTITION_NUM : maxPartitionNum; + this.rewriteOptions = + rewriteOptions == null + ? DEFAULT_REWRITE_OPTIONS + : Collections.unmodifiableMap(new LinkedHashMap<>(rewriteOptions)); + } + + /** + * Returns the minimum threshold for {@value DATA_FILE_MSE_METRIC}. + * + * @return minimum data file MSE threshold + */ + public Long minDataFileMse() { + return minDataFileMse; + } + + /** + * Returns the minimum threshold for {@value DELETE_FILE_NUMBER_METRIC}. + * + * @return minimum delete file number threshold + */ + public Long minDeleteFileNumber() { + return minDeleteFileNumber; + } + + /** + * Returns the weight used by {@value DATA_FILE_MSE_METRIC} in score expression. + * + * @return data file MSE score weight + */ + public Long dataFileMseWeight() { + return dataFileMseWeight; + } + + /** + * Returns the weight used by {@value DELETE_FILE_NUMBER_METRIC} in score expression. + * + * @return delete file number score weight + */ + public Long deleteFileNumberWeight() { + return deleteFileNumberWeight; + } + + /** + * Returns max partition number selected for compaction. + * + * @return max partition number + */ + public Long maxPartitionNum() { + return maxPartitionNum; + } + + /** + * Returns rewrite options that are expanded to {@code job.options.*} rule entries. + * + * @return rewrite options + */ + public Map<String, String> rewriteOptions() { + return rewriteOptions; + } + + @Override + public Set<MetadataObject.Type> supportedObjectTypes() { + return SUPPORTED_OBJECT_TYPES; + } + + @Override + public Map<String, String> properties() { + // properties keep stable strategy/job identity; thresholds and scoring knobs belong to rules. + return ImmutableMap.of( + STRATEGY_TYPE_KEY, STRATEGY_TYPE_VALUE, JOB_TEMPLATE_NAME_KEY, JOB_TEMPLATE_NAME_VALUE); + } + + @Override + public Map<String, Object> rules() { + Map<String, Object> rules = new LinkedHashMap<>(); + rules.put(MIN_DATA_FILE_MSE_KEY, minDataFileMse); + rules.put(MIN_DELETE_FILE_NUMBER_KEY, minDeleteFileNumber); + rules.put(DATA_FILE_MSE_WEIGHT_KEY, dataFileMseWeight); + rules.put(DELETE_FILE_NUMBER_WEIGHT_KEY, deleteFileNumberWeight); + rules.put(MAX_PARTITION_NUM_KEY, maxPartitionNum); + rules.put(TRIGGER_EXPR_KEY, TRIGGER_EXPR); + rules.put(SCORE_EXPR_KEY, SCORE_EXPR); + rewriteOptions.forEach((key, value) -> rules.put(JOB_OPTIONS_PREFIX + key, value)); + return Collections.unmodifiableMap(rules); + } + + @Override + public void validate() throws IllegalArgumentException { + PolicyContent.super.validate(); + // All fields are defaulted in the constructor, so only range checks are needed here. + Preconditions.checkArgument(minDataFileMse >= 0, "minDataFileMse must be >= 0"); + Preconditions.checkArgument(minDeleteFileNumber >= 0, "minDeleteFileNumber must be >= 0"); + Preconditions.checkArgument(dataFileMseWeight >= 0, "dataFileMseWeight must be >= 0"); + Preconditions.checkArgument(deleteFileNumberWeight >= 0, "deleteFileNumberWeight must be >= 0"); + Preconditions.checkArgument(maxPartitionNum > 0, "maxPartitionNum must be > 0"); + + rewriteOptions.forEach( + (key, value) -> { + Preconditions.checkArgument(StringUtils.isNotBlank(key), "rewrite option key is blank"); + Preconditions.checkArgument( + OPTION_KEY_PATTERN.matcher(key).matches(), + "rewrite option key '%s' contains illegal characters", + key); + Preconditions.checkArgument( + !key.startsWith(JOB_OPTIONS_PREFIX), + "rewrite option key '%s' must not start with '%s'", + key, + JOB_OPTIONS_PREFIX); + Preconditions.checkArgument( + StringUtils.isNotBlank(value), "rewrite option '%s' must have non-empty value", key); + }); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof IcebergDataCompactionContent)) { + return false; + } + IcebergDataCompactionContent that = (IcebergDataCompactionContent) o; + return Objects.equals(minDataFileMse, that.minDataFileMse) + && Objects.equals(minDeleteFileNumber, that.minDeleteFileNumber) + && Objects.equals(dataFileMseWeight, that.dataFileMseWeight) + && Objects.equals(deleteFileNumberWeight, that.deleteFileNumberWeight) + && Objects.equals(maxPartitionNum, that.maxPartitionNum) + && Objects.equals(rewriteOptions, that.rewriteOptions); + } + + @Override + public int hashCode() { + return Objects.hash( + minDataFileMse, + minDeleteFileNumber, + dataFileMseWeight, + deleteFileNumberWeight, + maxPartitionNum, + rewriteOptions); + } + + @Override + public String toString() { + return "IcebergDataCompactionContent{" + + "minDataFileMse=" + + minDataFileMse + + ", minDeleteFileNumber=" + + minDeleteFileNumber + + ", dataFileMseWeight=" + + dataFileMseWeight + + ", deleteFileNumberWeight=" + + deleteFileNumberWeight + + ", maxPartitionNum=" + + maxPartitionNum + + ", rewriteOptions=" + + rewriteOptions + + '}'; + } +} diff --git a/api/src/main/java/org/apache/gravitino/policy/Policy.java b/api/src/main/java/org/apache/gravitino/policy/Policy.java index 4f0e7cd1e0..7b27b01fcd 100644 --- a/api/src/main/java/org/apache/gravitino/policy/Policy.java +++ b/api/src/main/java/org/apache/gravitino/policy/Policy.java @@ -39,9 +39,9 @@ public interface Policy extends Auditable { /** The built-in policy types. Predefined policy types that are provided by the system. */ enum BuiltInType { - // todo: add built-in policies, such as: - // DATA_COMPACTION(BUILT_IN_TYPE_PREFIX + "data_compaction", - // PolicyContent.DataCompactionContent.class) + /** Built-in policy type for Iceberg compaction strategy. */ + ICEBERG_COMPACTION( + BUILT_IN_TYPE_PREFIX + "iceberg_compaction", IcebergDataCompactionContent.class), /** * Custom policy type. "custom" is a fixed string that indicates the policy is a non-built-in @@ -61,7 +61,8 @@ public interface Policy extends Auditable { * Get the built-in policy type from the policy type string. * * @param policyType the policy type string - * @return the built-in policy type if it matches, otherwise returns CUSTOM type + * @return the matched built-in policy type + * @throws IllegalArgumentException if the policy type is unknown or blank */ public static BuiltInType fromPolicyType(String policyType) { Preconditions.checkArgument(StringUtils.isNotBlank(policyType), "policyType cannot be blank"); diff --git a/api/src/main/java/org/apache/gravitino/policy/PolicyContents.java b/api/src/main/java/org/apache/gravitino/policy/PolicyContents.java index 5922136ba3..4b6acc9b94 100644 --- a/api/src/main/java/org/apache/gravitino/policy/PolicyContents.java +++ b/api/src/main/java/org/apache/gravitino/policy/PolicyContents.java @@ -42,6 +42,67 @@ public class PolicyContents { return new CustomContent(rules, supportedObjectTypes, properties); } + /** + * Creates an iceberg compaction policy content with default values. + * + * @return iceberg compaction policy content with defaults + */ + public static PolicyContent icebergDataCompaction() { + return icebergDataCompaction( + IcebergDataCompactionContent.DEFAULT_MIN_DATA_FILE_MSE, + IcebergDataCompactionContent.DEFAULT_MIN_DELETE_FILE_NUMBER, + IcebergDataCompactionContent.DEFAULT_DATA_FILE_MSE_WEIGHT, + IcebergDataCompactionContent.DEFAULT_DELETE_FILE_NUMBER_WEIGHT, + IcebergDataCompactionContent.DEFAULT_MAX_PARTITION_NUM, + IcebergDataCompactionContent.DEFAULT_REWRITE_OPTIONS); + } + + /** + * Creates an iceberg compaction policy content. + * + * @param minDataFileMse minimum threshold for custom-data-file-mse + * @param minDeleteFileNumber minimum threshold for custom-delete-file-number + * @param rewriteOptions rewrite options forwarded as job.options.* + * @return iceberg compaction policy content + */ + public static PolicyContent icebergDataCompaction( + long minDataFileMse, long minDeleteFileNumber, Map<String, String> rewriteOptions) { + return icebergDataCompaction( + minDataFileMse, + minDeleteFileNumber, + IcebergDataCompactionContent.DEFAULT_DATA_FILE_MSE_WEIGHT, + IcebergDataCompactionContent.DEFAULT_DELETE_FILE_NUMBER_WEIGHT, + IcebergDataCompactionContent.DEFAULT_MAX_PARTITION_NUM, + rewriteOptions); + } + + /** + * Creates an iceberg compaction policy content with configurable score weights. + * + * @param minDataFileMse minimum threshold for custom-data-file-mse + * @param minDeleteFileNumber minimum threshold for custom-delete-file-number + * @param dataFileMseWeight weight used for custom-data-file-mse score contribution + * @param deleteFileNumberWeight weight used for custom-delete-file-number score contribution + * @param maxPartitionNum maximum partition number selected for compaction + * @param rewriteOptions rewrite options forwarded as job.options.* + * @return iceberg compaction policy content + */ + public static PolicyContent icebergDataCompaction( + long minDataFileMse, + long minDeleteFileNumber, + long dataFileMseWeight, + long deleteFileNumberWeight, + long maxPartitionNum, + Map<String, String> rewriteOptions) { + return new IcebergDataCompactionContent( + minDataFileMse, + minDeleteFileNumber, + dataFileMseWeight, + deleteFileNumberWeight, + maxPartitionNum, + rewriteOptions); + } + private PolicyContents() {} /** diff --git a/api/src/test/java/org/apache/gravitino/policy/TestPolicyBuiltInType.java b/api/src/test/java/org/apache/gravitino/policy/TestPolicyBuiltInType.java new file mode 100644 index 0000000000..68d2065cf5 --- /dev/null +++ b/api/src/test/java/org/apache/gravitino/policy/TestPolicyBuiltInType.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.policy; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestPolicyBuiltInType { + + @Test + void testFromPolicyTypeWithBuiltInTypeValue() { + Assertions.assertEquals( + Policy.BuiltInType.ICEBERG_COMPACTION, + Policy.BuiltInType.fromPolicyType("system_iceberg_compaction")); + } + + @Test + void testFromPolicyTypeWithEnumName() { + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> Policy.BuiltInType.fromPolicyType("ICEBERG_COMPACTION")); + Assertions.assertTrue(exception.getMessage().contains("Unknown policy type")); + } + + @Test + void testBuiltInTypePolicyTypeValue() { + Assertions.assertEquals( + "system_iceberg_compaction", Policy.BuiltInType.ICEBERG_COMPACTION.policyType()); + } + + @Test + void testBuiltInTypeContentClass() { + Assertions.assertEquals( + IcebergDataCompactionContent.class, Policy.BuiltInType.ICEBERG_COMPACTION.contentClass()); + } +} diff --git a/api/src/test/java/org/apache/gravitino/policy/TestPolicyContents.java b/api/src/test/java/org/apache/gravitino/policy/TestPolicyContents.java new file mode 100644 index 0000000000..702515d14b --- /dev/null +++ b/api/src/test/java/org/apache/gravitino/policy/TestPolicyContents.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.policy; + +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import org.apache.gravitino.MetadataObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestPolicyContents { + + @Test + void testIcebergCompactionContentUsesDefaults() { + IcebergDataCompactionContent content = + (IcebergDataCompactionContent) PolicyContents.icebergDataCompaction(); + + Assertions.assertEquals( + IcebergDataCompactionContent.DEFAULT_MIN_DATA_FILE_MSE, + content.rules().get("minDataFileMse")); + Assertions.assertEquals( + IcebergDataCompactionContent.DEFAULT_MIN_DELETE_FILE_NUMBER, + content.rules().get("minDeleteFileNumber")); + Assertions.assertEquals(1L, content.rules().get("dataFileMseWeight")); + Assertions.assertEquals(100L, content.rules().get("deleteFileNumberWeight")); + Assertions.assertEquals(50L, content.rules().get("max-partition-num")); + Assertions.assertNull(content.rules().get("job.options.target-file-size-bytes")); + Assertions.assertNull(content.rules().get("job.options.min-input-files")); + Assertions.assertNull(content.rules().get("job.options.delete-file-threshold")); + } + + @Test + void testIcebergCompactionContentGeneratesOptimizerFields() { + IcebergDataCompactionContent content = + (IcebergDataCompactionContent) + PolicyContents.icebergDataCompaction( + 1000L, 1L, Map.of("target-file-size-bytes", "1048576", "min-input-files", "1")); + + Assertions.assertEquals("iceberg-data-compaction", content.properties().get("strategy.type")); + Assertions.assertEquals( + "builtin-iceberg-rewrite-data-files", content.properties().get("job.template-name")); + Assertions.assertEquals(1000L, content.rules().get("minDataFileMse")); + Assertions.assertEquals(1L, content.rules().get("minDeleteFileNumber")); + Assertions.assertEquals(1L, content.rules().get("dataFileMseWeight")); + Assertions.assertEquals(100L, content.rules().get("deleteFileNumberWeight")); + Assertions.assertEquals(50L, content.rules().get("max-partition-num")); + Assertions.assertEquals( + "custom-data-file-mse >= minDataFileMse || custom-delete-file-number >= minDeleteFileNumber", + content.rules().get("trigger-expr")); + Assertions.assertEquals( + "custom-data-file-mse * dataFileMseWeight" + + " + custom-delete-file-number * deleteFileNumberWeight", + content.rules().get("score-expr")); + Assertions.assertEquals("1048576", content.rules().get("job.options.target-file-size-bytes")); + Assertions.assertEquals("1", content.rules().get("job.options.min-input-files")); + Assertions.assertEquals( + ImmutableSet.of( + MetadataObject.Type.CATALOG, MetadataObject.Type.SCHEMA, MetadataObject.Type.TABLE), + content.supportedObjectTypes()); + } + + @Test + void testIcebergCompactionContentSupportsCustomWeights() { + IcebergDataCompactionContent content = + (IcebergDataCompactionContent) + PolicyContents.icebergDataCompaction( + 1000L, + 1L, + 3L, + 200L, + 88L, + Map.of("target-file-size-bytes", "1048576", "min-input-files", "1")); + + Assertions.assertEquals(3L, content.rules().get("dataFileMseWeight")); + Assertions.assertEquals(200L, content.rules().get("deleteFileNumberWeight")); + Assertions.assertEquals(88L, content.rules().get("max-partition-num")); + Assertions.assertDoesNotThrow(content::validate); + } + + @Test + void testIcebergCompactionContentRejectsInvalidRewriteOptionKey() { + IcebergDataCompactionContent content = + (IcebergDataCompactionContent) + PolicyContents.icebergDataCompaction( + 1000L, 1L, Map.of("job.options.target-file-size-bytes", "1048576")); + + IllegalArgumentException exception = + Assertions.assertThrows(IllegalArgumentException.class, content::validate); + Assertions.assertTrue(exception.getMessage().contains("must not start with")); + } + + @Test + void testIcebergCompactionContentRejectsInvalidMaxPartitionNum() { + IcebergDataCompactionContent content = + (IcebergDataCompactionContent) + PolicyContents.icebergDataCompaction( + 1000L, 1L, 2L, 10L, 0L, Map.of("target-file-size-bytes", "1048576")); + + IllegalArgumentException exception = + Assertions.assertThrows(IllegalArgumentException.class, content::validate); + Assertions.assertTrue(exception.getMessage().contains("maxPartitionNum")); + } +} diff --git a/common/src/main/java/org/apache/gravitino/dto/policy/PolicyContentDTO.java b/common/src/main/java/org/apache/gravitino/dto/policy/PolicyContentDTO.java index 2de0cbeadd..c9d6cb0f4b 100644 --- a/common/src/main/java/org/apache/gravitino/dto/policy/PolicyContentDTO.java +++ b/common/src/main/java/org/apache/gravitino/dto/policy/PolicyContentDTO.java @@ -19,6 +19,8 @@ package org.apache.gravitino.dto.policy; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import lombok.AllArgsConstructor; @@ -26,7 +28,9 @@ import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.policy.IcebergDataCompactionContent; import org.apache.gravitino.policy.PolicyContent; +import org.apache.gravitino.policy.PolicyContents; /** Represents a Policy Content Data Transfer Object (DTO). */ public interface PolicyContentDTO extends PolicyContent { @@ -69,4 +73,130 @@ public interface PolicyContentDTO extends PolicyContent { return properties; } } + + /** Represents a typed iceberg compaction policy content DTO. */ + @EqualsAndHashCode + @ToString + @Builder(setterPrefix = "with") + @AllArgsConstructor(access = lombok.AccessLevel.PRIVATE) + class IcebergCompactionContentDTO implements PolicyContentDTO { + + @JsonProperty("minDataFileMse") + private Long minDataFileMse; + + @JsonProperty("minDeleteFileNumber") + private Long minDeleteFileNumber; + + @JsonProperty("dataFileMseWeight") + private Long dataFileMseWeight; + + @JsonProperty("deleteFileNumberWeight") + private Long deleteFileNumberWeight; + + @JsonProperty("maxPartitionNum") + private Long maxPartitionNum; + + @JsonProperty("rewriteOptions") + private Map<String, String> rewriteOptions; + + // Default constructor for Jackson deserialization only. + private IcebergCompactionContentDTO() {} + + /** + * Returns the minimum threshold for custom-data-file-mse metric. + * + * @return minimum data file MSE threshold + */ + public Long minDataFileMse() { + return minDataFileMse == null + ? IcebergDataCompactionContent.DEFAULT_MIN_DATA_FILE_MSE + : minDataFileMse; + } + + /** + * Returns the minimum threshold for custom-delete-file-number metric. + * + * @return minimum delete file number threshold + */ + public Long minDeleteFileNumber() { + return minDeleteFileNumber == null + ? IcebergDataCompactionContent.DEFAULT_MIN_DELETE_FILE_NUMBER + : minDeleteFileNumber; + } + + /** + * Returns the weight for custom-data-file-mse metric in score expression. + * + * @return data file MSE score weight + */ + public Long dataFileMseWeight() { + return dataFileMseWeight == null + ? IcebergDataCompactionContent.DEFAULT_DATA_FILE_MSE_WEIGHT + : dataFileMseWeight; + } + + /** + * Returns the weight for custom-delete-file-number metric in score expression. + * + * @return delete file number score weight + */ + public Long deleteFileNumberWeight() { + return deleteFileNumberWeight == null + ? IcebergDataCompactionContent.DEFAULT_DELETE_FILE_NUMBER_WEIGHT + : deleteFileNumberWeight; + } + + /** + * Returns max partition number selected for compaction. + * + * @return max partition number + */ + public Long maxPartitionNum() { + return maxPartitionNum == null + ? IcebergDataCompactionContent.DEFAULT_MAX_PARTITION_NUM + : maxPartitionNum; + } + + /** + * Returns rewrite options expanded to job.options.* during rule generation. + * + * @return rewrite options map + */ + public Map<String, String> rewriteOptions() { + return rewriteOptions == null + ? IcebergDataCompactionContent.DEFAULT_REWRITE_OPTIONS + : Collections.unmodifiableMap(new LinkedHashMap<>(rewriteOptions)); + } + + @Override + public Set<MetadataObject.Type> supportedObjectTypes() { + return toDomainContent().supportedObjectTypes(); + } + + @Override + public Map<String, String> properties() { + return toDomainContent().properties(); + } + + @Override + public Map<String, Object> rules() { + return toDomainContent().rules(); + } + + @Override + public void validate() throws IllegalArgumentException { + PolicyContentDTO.super.validate(); + toDomainContent().validate(); + } + + private PolicyContent toDomainContent() { + return PolicyContents.icebergDataCompaction( + minDataFileMse(), + minDeleteFileNumber(), + dataFileMseWeight(), + deleteFileNumberWeight(), + maxPartitionNum(), + rewriteOptions()); + } + } } diff --git a/common/src/main/java/org/apache/gravitino/dto/policy/PolicyDTO.java b/common/src/main/java/org/apache/gravitino/dto/policy/PolicyDTO.java index d6c24dca24..51c1780879 100644 --- a/common/src/main/java/org/apache/gravitino/dto/policy/PolicyDTO.java +++ b/common/src/main/java/org/apache/gravitino/dto/policy/PolicyDTO.java @@ -55,6 +55,9 @@ public class PolicyDTO implements Policy { // add mappings for built-in types here // For example: @JsonSubTypes.Type(value = DataCompactionContent.class, name = // "system_data_compaction") + @JsonSubTypes.Type( + value = PolicyContentDTO.IcebergCompactionContentDTO.class, + name = "system_iceberg_compaction") }) private PolicyContentDTO content; diff --git a/common/src/main/java/org/apache/gravitino/dto/requests/PolicyCreateRequest.java b/common/src/main/java/org/apache/gravitino/dto/requests/PolicyCreateRequest.java index ba1f06ff30..158bbb3e43 100644 --- a/common/src/main/java/org/apache/gravitino/dto/requests/PolicyCreateRequest.java +++ b/common/src/main/java/org/apache/gravitino/dto/requests/PolicyCreateRequest.java @@ -59,6 +59,9 @@ public class PolicyCreateRequest implements RESTRequest { // add mappings for built-in types here // For example: @JsonSubTypes.Type(value = DataCompactionContent.class, name = // "system_data_compaction") + @JsonSubTypes.Type( + value = PolicyContentDTO.IcebergCompactionContentDTO.class, + name = "system_iceberg_compaction") }) private final PolicyContentDTO policyContent; diff --git a/common/src/main/java/org/apache/gravitino/dto/requests/PolicyUpdateRequest.java b/common/src/main/java/org/apache/gravitino/dto/requests/PolicyUpdateRequest.java index 035ff10490..2cc53c68d0 100644 --- a/common/src/main/java/org/apache/gravitino/dto/requests/PolicyUpdateRequest.java +++ b/common/src/main/java/org/apache/gravitino/dto/requests/PolicyUpdateRequest.java @@ -140,6 +140,9 @@ public interface PolicyUpdateRequest extends RESTRequest { // add mappings for built-in types here // For example: @JsonSubTypes.Type(value = DataCompactionContent.class, name = // "system_data_compaction") + @JsonSubTypes.Type( + value = PolicyContentDTO.IcebergCompactionContentDTO.class, + name = "system_iceberg_compaction") }) private final PolicyContentDTO newContent; diff --git a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java index 8bd6bda85b..f793d28844 100644 --- a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java +++ b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java @@ -97,6 +97,7 @@ import org.apache.gravitino.job.SparkJobTemplate; import org.apache.gravitino.messaging.Topic; import org.apache.gravitino.model.Model; import org.apache.gravitino.model.ModelVersion; +import org.apache.gravitino.policy.IcebergDataCompactionContent; import org.apache.gravitino.policy.PolicyContent; import org.apache.gravitino.policy.PolicyContents; import org.apache.gravitino.rel.Column; @@ -563,6 +564,19 @@ public class DTOConverters { .build(); } + if (policyContent instanceof IcebergDataCompactionContent) { + IcebergDataCompactionContent icebergCompactionContent = + (IcebergDataCompactionContent) policyContent; + return PolicyContentDTO.IcebergCompactionContentDTO.builder() + .withMinDataFileMse(icebergCompactionContent.minDataFileMse()) + .withMinDeleteFileNumber(icebergCompactionContent.minDeleteFileNumber()) + .withDataFileMseWeight(icebergCompactionContent.dataFileMseWeight()) + .withDeleteFileNumberWeight(icebergCompactionContent.deleteFileNumberWeight()) + .withMaxPartitionNum(icebergCompactionContent.maxPartitionNum()) + .withRewriteOptions(icebergCompactionContent.rewriteOptions()) + .build(); + } + throw new IllegalArgumentException("Unsupported policy content: " + policyContent); } @@ -1295,6 +1309,18 @@ public class DTOConverters { customContentDTO.properties()); } + if (policyContentDTO instanceof PolicyContentDTO.IcebergCompactionContentDTO) { + PolicyContentDTO.IcebergCompactionContentDTO icebergCompactionContentDTO = + (PolicyContentDTO.IcebergCompactionContentDTO) policyContentDTO; + return PolicyContents.icebergDataCompaction( + icebergCompactionContentDTO.minDataFileMse(), + icebergCompactionContentDTO.minDeleteFileNumber(), + icebergCompactionContentDTO.dataFileMseWeight(), + icebergCompactionContentDTO.deleteFileNumberWeight(), + icebergCompactionContentDTO.maxPartitionNum(), + icebergCompactionContentDTO.rewriteOptions()); + } + throw new IllegalArgumentException( "Unsupported policy content type: " + policyContentDTO.getClass()); } diff --git a/common/src/test/java/org/apache/gravitino/dto/policy/TestPolicyDTO.java b/common/src/test/java/org/apache/gravitino/dto/policy/TestPolicyDTO.java index f1c2aeb7a1..7bbe897cfc 100644 --- a/common/src/test/java/org/apache/gravitino/dto/policy/TestPolicyDTO.java +++ b/common/src/test/java/org/apache/gravitino/dto/policy/TestPolicyDTO.java @@ -26,6 +26,7 @@ import java.util.Optional; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.dto.AuditDTO; import org.apache.gravitino.json.JsonUtils; +import org.apache.gravitino.policy.IcebergDataCompactionContent; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -107,4 +108,67 @@ public class TestPolicyDTO { PolicyDTO deserPolicyDTO4 = JsonUtils.objectMapper().readValue(serJson, PolicyDTO.class); Assertions.assertEquals(Optional.of(true), deserPolicyDTO4.inherited()); } + + @Test + public void testIcebergCompactionPolicySerDe() throws JsonProcessingException { + AuditDTO audit = AuditDTO.builder().withCreator("user1").withCreateTime(Instant.now()).build(); + PolicyContentDTO.IcebergCompactionContentDTO typedContent = + PolicyContentDTO.IcebergCompactionContentDTO.builder() + .withMinDataFileMse(1000L) + .withMinDeleteFileNumber(1L) + .withDataFileMseWeight(2L) + .withDeleteFileNumberWeight(150L) + .withMaxPartitionNum(99L) + .withRewriteOptions( + ImmutableMap.of("target-file-size-bytes", "1048576", "min-input-files", "1")) + .build(); + + PolicyDTO policyDTO = + PolicyDTO.builder() + .withName("iceberg-compaction") + .withComment("typed policy") + .withPolicyType("system_iceberg_compaction") + .withEnabled(true) + .withContent(typedContent) + .withAudit(audit) + .build(); + + String serJson = JsonUtils.objectMapper().writeValueAsString(policyDTO); + PolicyDTO deserPolicyDTO = JsonUtils.objectMapper().readValue(serJson, PolicyDTO.class); + + Assertions.assertEquals(policyDTO, deserPolicyDTO); + Assertions.assertInstanceOf( + PolicyContentDTO.IcebergCompactionContentDTO.class, deserPolicyDTO.content()); + } + + @Test + public void testIcebergCompactionPolicyDefaultValues() throws JsonProcessingException { + String json = + "{" + + "\"name\":\"iceberg-compaction-default\"," + + "\"comment\":\"typed policy\"," + + "\"policyType\":\"system_iceberg_compaction\"," + + "\"enabled\":true," + + "\"content\":{}" + + "}"; + + PolicyDTO policyDTO = JsonUtils.objectMapper().readValue(json, PolicyDTO.class); + PolicyContentDTO.IcebergCompactionContentDTO contentDTO = + (PolicyContentDTO.IcebergCompactionContentDTO) policyDTO.content(); + + Assertions.assertEquals( + IcebergDataCompactionContent.DEFAULT_MIN_DATA_FILE_MSE, contentDTO.minDataFileMse()); + Assertions.assertEquals( + IcebergDataCompactionContent.DEFAULT_MIN_DELETE_FILE_NUMBER, + contentDTO.minDeleteFileNumber()); + Assertions.assertEquals( + IcebergDataCompactionContent.DEFAULT_DATA_FILE_MSE_WEIGHT, contentDTO.dataFileMseWeight()); + Assertions.assertEquals( + IcebergDataCompactionContent.DEFAULT_DELETE_FILE_NUMBER_WEIGHT, + contentDTO.deleteFileNumberWeight()); + Assertions.assertEquals( + IcebergDataCompactionContent.DEFAULT_MAX_PARTITION_NUM, contentDTO.maxPartitionNum()); + Assertions.assertTrue(contentDTO.rewriteOptions().isEmpty()); + Assertions.assertDoesNotThrow(contentDTO::validate); + } } diff --git a/core/src/main/java/org/apache/gravitino/meta/PolicyEntity.java b/core/src/main/java/org/apache/gravitino/meta/PolicyEntity.java index 6a1070489f..2c122c3ca8 100644 --- a/core/src/main/java/org/apache/gravitino/meta/PolicyEntity.java +++ b/core/src/main/java/org/apache/gravitino/meta/PolicyEntity.java @@ -163,6 +163,12 @@ public class PolicyEntity implements Entity, Auditable, HasIdentifier { || content() instanceof PolicyContents.CustomContent, "Expected CustomContent for custom policy type, but got %s", content().getClass().getName()); + Preconditions.checkArgument( + policyType == Policy.BuiltInType.CUSTOM || policyType.contentClass().isInstance(content()), + "Expected %s for policy type %s, but got %s", + policyType.contentClass().getSimpleName(), + policyType.name(), + content().getClass().getName()); content().validate(); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java index 95344a8f89..71ce8b42a2 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java @@ -820,7 +820,7 @@ public class POConverters { return PolicyPO.builder() .withPolicyId(newPolicy.id()) .withPolicyName(newPolicy.name()) - .withPolicyType(newPolicy.policyType().name()) + .withPolicyType(newPolicy.policyType().policyType()) .withMetalakeId(oldPolicyPO.getMetalakeId()) .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newPolicy.auditInfo())) .withCurrentVersion(currentVersion) @@ -1512,7 +1512,7 @@ public class POConverters { return builder .withPolicyId(policyEntity.id()) .withPolicyName(policyEntity.name()) - .withPolicyType(policyEntity.policyType().name()) + .withPolicyType(policyEntity.policyType().policyType()) .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(policyEntity.auditInfo())) .withCurrentVersion(INIT_VERSION) .withLastVersion(INIT_VERSION) diff --git a/core/src/test/java/org/apache/gravitino/meta/TestPolicyEntity.java b/core/src/test/java/org/apache/gravitino/meta/TestPolicyEntity.java index 94fd8a86cb..5152d14429 100644 --- a/core/src/test/java/org/apache/gravitino/meta/TestPolicyEntity.java +++ b/core/src/test/java/org/apache/gravitino/meta/TestPolicyEntity.java @@ -117,4 +117,42 @@ public class TestPolicyEntity { AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build()) .build()); } + + @Test + public void testBuiltInPolicyContentTypeValidation() { + Namespace namespace = Namespace.of("m1", "c1", "s1"); + AuditInfo auditInfo = + AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build(); + + PolicyContent icebergCompactionContent = + PolicyContents.icebergDataCompaction( + 1000L, 1L, Map.of("target-file-size-bytes", "1048576")); + Assertions.assertDoesNotThrow( + () -> + PolicyEntity.builder() + .withId(1L) + .withName("iceberg-compaction") + .withNamespace(namespace) + .withPolicyType(Policy.BuiltInType.ICEBERG_COMPACTION) + .withEnabled(true) + .withContent(icebergCompactionContent) + .withAuditInfo(auditInfo) + .build()); + + PolicyContent customContent = + PolicyContents.custom( + ImmutableMap.of("k", "v"), ImmutableSet.of(MetadataObject.Type.TABLE), Map.of()); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + PolicyEntity.builder() + .withId(1L) + .withName("iceberg-compaction") + .withNamespace(namespace) + .withPolicyType(Policy.BuiltInType.ICEBERG_COMPACTION) + .withEnabled(true) + .withContent(customContent) + .withAuditInfo(auditInfo) + .build()); + } } diff --git a/docs/iceberg-compaction-policy.md b/docs/iceberg-compaction-policy.md new file mode 100644 index 0000000000..b68edf0511 --- /dev/null +++ b/docs/iceberg-compaction-policy.md @@ -0,0 +1,134 @@ +--- +title: "Iceberg compaction policy" +slug: /iceberg-compaction-policy +date: 2026-03-05 +keyword: iceberg, compaction, policy, optimizer, Gravitino +license: This software is licensed under the Apache License version 2. +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +## Overview + +`system_iceberg_compaction` is a built-in policy type used by the optimizer to generate compaction strategies and job contexts for Iceberg tables. + +This policy supports `CATALOG`, `SCHEMA`, and `TABLE` metadata objects. + +## Policy content + +The typed content for `system_iceberg_compaction` supports the following fields: + +| Field | Required | Default | Description | +|---|---|---|---| +| `minDataFileMse` | No | `405323966463344` | Minimum threshold for metric `custom-data-file-mse`. Must be `>= 0`. | +| `minDeleteFileNumber` | No | `1` | Minimum threshold for metric `custom-delete-file-number`. Must be `>= 0`. | +| `dataFileMseWeight` | No | `1` | Score weight of `custom-data-file-mse`. Must be `>= 0`. | +| `deleteFileNumberWeight` | No | `100` | Score weight of `custom-delete-file-number`. Must be `>= 0`. | +| `maxPartitionNum` | No | `50` | Maximum number of partitions selected by optimizer. Must be `> 0`. | +| `rewriteOptions` | No | `{}` | Additional rewrite options, expanded as `job.options.*` rules. | + +## Generated rules and properties + +The policy content is converted to: + +- Properties: + - `strategy.type=iceberg-data-compaction` + - `job.template-name=builtin-iceberg-rewrite-data-files` +- Rules: + - `trigger-expr=custom-data-file-mse >= minDataFileMse || custom-delete-file-number >= minDeleteFileNumber` + - `score-expr=custom-data-file-mse * dataFileMseWeight + custom-delete-file-number * deleteFileNumberWeight` + - `max-partition-num=<maxPartitionNum>` + - `job.options.<key>=<value>` for each rewrite option + +## Parameter tuning guide + +### Metric unit and threshold formula + +`custom-data-file-mse` is expected to be in `byte^2`. + +Use the target file size and a tolerance ratio to set `minDataFileMse`: + +`minDataFileMse = (target-file-size-bytes * ratio)^2` + +Recommended `ratio` range: `0.1` to `0.2`. + +Default values use: + +- `target-file-size-bytes = 134217728` (128 MiB) +- `ratio = 0.15` +- `minDataFileMse = 405323966463344` + +### Trigger behavior + +The trigger expression uses `>=`. + +- Set `minDeleteFileNumber = 1` to trigger when at least one delete file exists. +- Set `minDeleteFileNumber > 1` to reduce compaction frequency for delete files. + +### Score weights + +Score is computed as: + +`custom-data-file-mse * dataFileMseWeight + custom-delete-file-number * deleteFileNumberWeight` + +- Keep `dataFileMseWeight = 1` as baseline. +- Increase `deleteFileNumberWeight` if you want partitions with more delete files to be prioritized. +- Keep both weights non-negative. + +### Recommended defaults for production start + +- `minDataFileMse = 405323966463344` (computed from 128 MiB and ratio `0.15`) +- `minDeleteFileNumber = 1` +- `dataFileMseWeight = 1` +- `deleteFileNumberWeight = 100` +- `maxPartitionNum = 50` + +Recommended `rewriteOptions`: + +- `target-file-size-bytes = 134217728` +- `min-input-files = 5` +- `delete-file-threshold = 1` + +## Create policy examples + +<Tabs groupId='language' queryString> +<TabItem value="shell" label="Shell"> + +```shell +curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "iceberg_compaction_default", + "comment": "Built-in iceberg compaction policy", + "policyType": "system_iceberg_compaction", + "enabled": true, + "content": {} + }' \ + http://localhost:8090/api/metalakes/test/policies +``` + +</TabItem> +<TabItem value="java" label="Java"> + +```java +GravitinoClient client = ...; + +PolicyContent content = PolicyContents.icebergDataCompaction(); + +Policy policy = + client.createPolicy( + "iceberg_compaction_default", + "system_iceberg_compaction", + "Built-in iceberg compaction policy", + true, + content); +``` + +</TabItem> +</Tabs> + +## Attach policy to metadata objects + +After the policy is created, associate it with a catalog, schema, or table through standard policy association APIs. +The optimizer will read the generated rules and properties to evaluate strategy triggering and job submission context. diff --git a/docs/manage-policies-in-gravitino.md b/docs/manage-policies-in-gravitino.md index b4c6c7f1b7..5eb848780f 100644 --- a/docs/manage-policies-in-gravitino.md +++ b/docs/manage-policies-in-gravitino.md @@ -42,8 +42,8 @@ Javadoc and REST API documentation. The first step to managing policies is to create new policies. You can create a new policy by providing a policy name, type, and other optional fields like comment, enabled, etc. -Gravitino supports two kinds of policies: built-in policies and custom policies. -For built-in policies, the `policyType` starts with `system.` and the `supportedObjectTypes` in the policy content is predefined. +Gravitino supports two kinds of policies: built-in policies and custom policies. +For built-in policies, the `policyType` starts with `system_` and the `supportedObjectTypes` in the policy content is predefined. For custom policies, the `policyType` must be `custom` and the `supportedObjectTypes` can be any combination of metadata object types. :::note @@ -104,6 +104,10 @@ Policy policy = client.createPolicy( </TabItem> </Tabs> +### Built-in Iceberg compaction policy + +For the built-in `system_iceberg_compaction` policy content, field definitions, and examples, see [Iceberg compaction policy](./iceberg-compaction-policy.md). + ### List created policies You can list all the created policy names as well as policy objects in a metalake in Gravitino. @@ -465,4 +469,3 @@ int count = policy.associatedObjects().count(); </TabItem> </Tabs> - diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java index 774e2a8119..5a5c66d65e 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyHandler.java @@ -28,6 +28,7 @@ import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath; import org.apache.gravitino.maintenance.optimizer.api.common.Strategy; import org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext; import org.apache.gravitino.maintenance.optimizer.recommender.handler.BaseExpressionStrategyHandler; +import org.apache.gravitino.policy.IcebergDataCompactionContent; import org.apache.gravitino.rel.Table; /** @@ -35,7 +36,7 @@ import org.apache.gravitino.rel.Table; */ public class CompactionStrategyHandler extends BaseExpressionStrategyHandler { - public static final String NAME = "compaction"; + public static final String NAME = IcebergDataCompactionContent.STRATEGY_TYPE_VALUE; @Override public Set<DataRequirement> dataRequirements() { diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java index a66bb43b7a..e0708bdb12 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/strategy/GravitinoStrategy.java @@ -43,7 +43,7 @@ public class GravitinoStrategy implements PartitionStrategy { /** Rule key for the partition table score aggregation mode. */ public static final String PARTITION_TABLE_SCORE_MODE = "partition_table_score_mode"; /** Rule key for the maximum number of partitions selected for execution. */ - public static final String MAX_PARTITION_NUM = "max_partition_num"; + public static final String MAX_PARTITION_NUM = "max-partition-num"; private static final int DEFAULT_MAX_PARTITION_NUM = 100; diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java index 1934b0f215..04be5fc21d 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/CompactionStrategyForTest.java @@ -35,7 +35,7 @@ public class CompactionStrategyForTest implements Strategy { @Override public String strategyType() { - return "compaction"; + return CompactionStrategyHandler.NAME; } @Override diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java index a236554fa4..231204b696 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestCompactionStrategyHandler.java @@ -435,7 +435,7 @@ class TestCompactionStrategyHandler { @Override public String strategyType() { - return "compaction"; + return CompactionStrategyHandler.NAME; } @Override diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestGravitinoPolicyCompactionStrategy.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestGravitinoPolicyCompactionStrategy.java new file mode 100644 index 0000000000..f36c28702e --- /dev/null +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/handler/compaction/TestGravitinoPolicyCompactionStrategy.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction; + +import java.util.List; +import java.util.Map; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry; +import org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyEvaluation; +import org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyHandlerContext; +import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl; +import org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy; +import org.apache.gravitino.policy.Policy; +import org.apache.gravitino.policy.PolicyContent; +import org.apache.gravitino.policy.PolicyContents; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.stats.StatisticValues; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class TestGravitinoPolicyCompactionStrategy { + + @Test + void testPolicyGeneratesExpectedStrategyAndJobContext() { + PolicyContent content = + PolicyContents.icebergDataCompaction( + 1000L, + 1L, + 2L, + 10L, + 20L, + Map.of("target-file-size-bytes", "1048576", "min-input-files", "4")); + Policy policy = Mockito.mock(Policy.class); + Mockito.when(policy.name()).thenReturn("iceberg-compaction-policy"); + Mockito.when(policy.content()).thenReturn(content); + + GravitinoStrategy strategy = new GravitinoStrategy(policy); + Assertions.assertEquals(CompactionStrategyHandler.NAME, strategy.strategyType()); + Assertions.assertEquals("builtin-iceberg-rewrite-data-files", strategy.jobTemplateName()); + Assertions.assertEquals( + Map.of("target-file-size-bytes", "1048576", "min-input-files", "4"), strategy.jobOptions()); + + NameIdentifier tableId = NameIdentifier.of("db", "table"); + Table tableMetadata = Mockito.mock(Table.class); + Mockito.when(tableMetadata.partitioning()).thenReturn(new Transform[0]); + Mockito.when(tableMetadata.columns()).thenReturn(new Column[0]); + List<StatisticEntry<?>> tableStatistics = + List.of( + new StatisticEntryImpl("custom-data-file-mse", StatisticValues.longValue(3000L)), + new StatisticEntryImpl("custom-delete-file-number", StatisticValues.longValue(5L))); + + StrategyHandlerContext context = + StrategyHandlerContext.builder(tableId, strategy) + .withTableMetadata(tableMetadata) + .withTableStatistics(tableStatistics) + .build(); + + CompactionStrategyHandler handler = new CompactionStrategyHandler(); + handler.initialize(context); + + Assertions.assertTrue(handler.shouldTrigger()); + + StrategyEvaluation evaluation = handler.evaluate(); + Assertions.assertEquals(6050L, evaluation.score()); + CompactionJobContext jobContext = + (CompactionJobContext) evaluation.jobExecutionContext().orElseThrow(); + Assertions.assertEquals(tableId, jobContext.nameIdentifier()); + Assertions.assertEquals("builtin-iceberg-rewrite-data-files", jobContext.jobTemplateName()); + Assertions.assertEquals( + Map.of("target-file-size-bytes", "1048576", "min-input-files", "4"), + jobContext.jobOptions()); + Assertions.assertTrue(jobContext.getPartitions().isEmpty()); + } +} diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java index e1202e5c18..5e73e91236 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/PolicyOperations.java @@ -23,7 +23,6 @@ import static org.apache.gravitino.dto.util.DTOConverters.fromDTO; import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; import java.util.Arrays; -import java.util.Locale; import java.util.Optional; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; @@ -149,6 +148,7 @@ public class PolicyOperations { httpRequest, () -> { request.validate(); + validateCreatePolicyType(request.getPolicyType()); PolicyEntity policy = policyDispatcher.createPolicy( metalake, @@ -334,17 +334,28 @@ public class PolicyOperations { } static PolicyDTO toDTO(PolicyEntity policy, Optional<Boolean> inherited) { + String policyType = policy.policyType().policyType(); PolicyDTO.Builder builder = PolicyDTO.builder() .withName(policy.name()) .withComment(policy.comment()) - .withPolicyType(policy.policyType().name().toLowerCase(Locale.ROOT)) + .withPolicyType(policyType) .withEnabled(policy.enabled()) .withContent(DTOConverters.toDTO(policy.content())) .withInherited(inherited) - .withAudit(DTOConverters.toDTO(policy.auditInfo())) - .withInherited(inherited); + .withAudit(DTOConverters.toDTO(policy.auditInfo())); return builder.build(); } + + private static void validateCreatePolicyType(String policyType) { + Policy.BuiltInType builtInType = Policy.BuiltInType.fromPolicyType(policyType); + if (builtInType != Policy.BuiltInType.CUSTOM + && !builtInType.policyType().equalsIgnoreCase(policyType)) { + throw new IllegalArgumentException( + String.format( + "Built-in policy type must use prefixed value '%s', but got: %s", + builtInType.policyType(), policyType)); + } + } } diff --git a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java index 5788583b5a..af8056b666 100644 --- a/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java +++ b/server/src/test/java/org/apache/gravitino/server/web/rest/TestPolicyOperations.java @@ -44,6 +44,7 @@ import org.apache.gravitino.Config; import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.MetadataObjects; +import org.apache.gravitino.dto.policy.PolicyContentDTO; import org.apache.gravitino.dto.requests.PolicyCreateRequest; import org.apache.gravitino.dto.requests.PolicySetRequest; import org.apache.gravitino.dto.requests.PolicyUpdateRequest; @@ -299,6 +300,7 @@ public class TestPolicyOperations extends BaseOperationsTest { Policy respPolicy = policyResp.getPolicy(); Assertions.assertEquals(policy1.name(), respPolicy.name()); Assertions.assertEquals(policy1.comment(), respPolicy.comment()); + Assertions.assertEquals("custom", respPolicy.policyType()); Assertions.assertEquals(Optional.empty(), respPolicy.inherited()); // Test throw PolicyAlreadyExistsException @@ -337,6 +339,101 @@ public class TestPolicyOperations extends BaseOperationsTest { Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp1.getType()); } + @Test + public void testCreatePolicyWithIcebergCompactionType() { + PolicyContent content = + PolicyContents.icebergDataCompaction( + 1000L, + 1L, + ImmutableMap.of("target-file-size-bytes", "1048576", "min-input-files", "1")); + PolicyEntity policy1 = + PolicyEntity.builder() + .withId(1L) + .withName("iceberg-compaction") + .withPolicyType(Policy.BuiltInType.ICEBERG_COMPACTION) + .withEnabled(true) + .withContent(content) + .withAuditInfo(testAuditInfo1) + .build(); + when(policyManager.createPolicy( + metalake, + "iceberg-compaction", + Policy.BuiltInType.ICEBERG_COMPACTION, + null, + true, + content)) + .thenReturn(policy1); + + PolicyCreateRequest request = + new PolicyCreateRequest( + "iceberg-compaction", "system_iceberg_compaction", null, true, toDTO(content)); + Response resp = + target(policyPath(metalake)) + .request(MediaType.APPLICATION_JSON_TYPE) + .accept("application/vnd.gravitino.v1+json") + .post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + PolicyResponse policyResp = resp.readEntity(PolicyResponse.class); + Assertions.assertEquals(0, policyResp.getCode()); + Assertions.assertEquals("system_iceberg_compaction", policyResp.getPolicy().policyType()); + } + + @Test + public void testCreatePolicyWithIcebergCompactionTypeDefaults() { + PolicyContent content = PolicyContents.icebergDataCompaction(); + PolicyEntity policy = + PolicyEntity.builder() + .withId(1L) + .withName("iceberg-compaction-default") + .withPolicyType(Policy.BuiltInType.ICEBERG_COMPACTION) + .withEnabled(true) + .withContent(content) + .withAuditInfo(testAuditInfo1) + .build(); + when(policyManager.createPolicy( + metalake, + "iceberg-compaction-default", + Policy.BuiltInType.ICEBERG_COMPACTION, + null, + true, + content)) + .thenReturn(policy); + + PolicyContentDTO.IcebergCompactionContentDTO minimalContent = + PolicyContentDTO.IcebergCompactionContentDTO.builder().build(); + PolicyCreateRequest request = + new PolicyCreateRequest( + "iceberg-compaction-default", "system_iceberg_compaction", null, true, minimalContent); + + Response resp = + target(policyPath(metalake)) + .request(MediaType.APPLICATION_JSON_TYPE) + .accept("application/vnd.gravitino.v1+json") + .post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + PolicyResponse policyResp = resp.readEntity(PolicyResponse.class); + Assertions.assertEquals(0, policyResp.getCode()); + Assertions.assertEquals("system_iceberg_compaction", policyResp.getPolicy().policyType()); + } + + @Test + public void testCreatePolicyWithIcebergCompactionEnumTypeRejected() { + PolicyContent content = PolicyContents.icebergDataCompaction(); + PolicyCreateRequest request = + new PolicyCreateRequest( + "iceberg-compaction", "ICEBERG_COMPACTION", null, true, toDTO(content)); + + Response resp = + target(policyPath(metalake)) + .request(MediaType.APPLICATION_JSON_TYPE) + .accept("application/vnd.gravitino.v1+json") + .post(Entity.entity(request, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resp.getStatus()); + } + @Test public void testGetPolicy() { ImmutableMap<String, Object> contentFields = ImmutableMap.of("target_file_size_bytes", 1000);
