This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f4d524e5ec [core] Introduce custom PartitionExpireStrategy (#5625)
f4d524e5ec is described below
commit f4d524e5ec6ab0eb2dd7572be6a08702cb73b81c
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 19 19:37:15 2025 +0800
[core] Introduce custom PartitionExpireStrategy (#5625)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 4 +-
.../org/apache/paimon/factories/FactoryUtil.java | 33 +++++++++++
.../java/org/apache/paimon/AbstractFileStore.java | 7 ++-
.../paimon/partition/PartitionExpireStrategy.java | 17 +++++-
.../partition/PartitionExpireStrategyFactory.java | 41 +++++++++++++
.../CustomPartitionExpirationFactory.java | 56 ++++++++++++++++++
.../paimon/partition/PartitionExpireTableTest.java | 68 ++++++++++++++++++++++
...paimon.partition.PartitionExpireStrategyFactory | 19 ++++++
.../flink/action/ExpirePartitionsAction.java | 6 +-
10 files changed, 247 insertions(+), 6 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index c2d91ea775..0de4bf5794 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -699,7 +699,7 @@ This config option does not affect the default filesystem
metastore.</td>
<td><h5>partition.expiration-strategy</h5></td>
<td style="word-wrap: break-word;">values-time</td>
<td><p>Enum</p></td>
- <td>The strategy determines how to extract the partition time and
compare it with the current time.<br /><br />Possible
values:<ul><li>"values-time": This strategy compares the time extracted from
the partition value with the current time.</li><li>"update-time": This strategy
compares the last update time of the partition with the current
time.</li></ul></td>
+ <td>The strategy determines how to extract the partition time and
compare it with the current time.<br /><br />Possible
values:<ul><li>"values-time": This strategy compares the time extracted from
the partition value with the current time.</li><li>"update-time": This strategy
compares the last update time of the partition with the current
time.</li><li>"custom": This strategy use custom class to expire
partitions.</li></ul></td>
</tr>
<tr>
<td><h5>partition.expiration-time</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 56cb707742..28e7acda81 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -3248,7 +3248,9 @@ public class CoreOptions implements Serializable {
UPDATE_TIME(
"update-time",
- "This strategy compares the last update time of the partition
with the current time.");
+ "This strategy compares the last update time of the partition
with the current time."),
+
+ CUSTOM("custom", "This strategy use custom class to expire
partitions.");
private final String value;
diff --git
a/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java
b/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java
index a492aeece7..8431c77872 100644
--- a/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java
+++ b/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java
@@ -140,4 +140,37 @@ public class FactoryUtil {
return loadResults;
}
+
+ /**
+ * Discover a singleton factory.
+ *
+ * @param classLoader the class loader
+ * @param klass the klass
+ * @param <T> the type of the factory
+ * @return the factory
+ */
+ public static <T> T discoverSingletonFactory(ClassLoader classLoader,
Class<T> klass) {
+ List<T> factories = FactoryUtil.discoverFactories(classLoader, klass);
+ if (factories.isEmpty()) {
+ throw new FactoryException(
+ String.format(
+ "Could not find any factories that implement '%s'
in the classpath.",
+ klass.getName()));
+ }
+
+ if (factories.size() > 1) {
+ throw new FactoryException(
+ String.format(
+ "Multiple factories that implement '%s' found in
the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ klass.getName(),
+ factories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return factories.get(0);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 929d760b0b..65e35e4345 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -79,6 +79,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import static
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -423,7 +424,11 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
table,
partitionExpireTime,
options.partitionExpireCheckInterval(),
- PartitionExpireStrategy.createPartitionExpireStrategy(options,
partitionType()));
+ createPartitionExpireStrategy(
+ options,
+ partitionType(),
+ catalogEnvironment.catalogLoader(),
+ catalogEnvironment.identifier()));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
index 2def5b43a9..7d016c0931 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
@@ -19,12 +19,16 @@
package org.apache.paimon.partition;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import javax.annotation.Nullable;
+
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -66,13 +70,22 @@ public abstract class PartitionExpireStrategy {
FileStoreScan scan, LocalDateTime expirationTime);
public static PartitionExpireStrategy createPartitionExpireStrategy(
- CoreOptions options, RowType partitionType) {
+ CoreOptions options,
+ RowType partitionType,
+ @Nullable CatalogLoader catalogLoader,
+ @Nullable Identifier identifier) {
switch (options.partitionExpireStrategy()) {
case UPDATE_TIME:
return new PartitionUpdateTimeExpireStrategy(partitionType);
case VALUES_TIME:
- default:
return new PartitionValuesTimeExpireStrategy(options,
partitionType);
+ case CUSTOM:
+ return PartitionExpireStrategyFactory.INSTANCE
+ .get()
+ .create(catalogLoader, identifier, partitionType);
+ default:
+ throw new IllegalArgumentException(
+ "Unknown partitionExpireStrategy: " +
options.partitionExpireStrategy());
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategyFactory.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategyFactory.java
new file mode 100644
index 0000000000..6e89e98725
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategyFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.base.Supplier;
+import org.apache.paimon.shade.guava30.com.google.common.base.Suppliers;
+
+/** Factory to create a {@link PartitionExpireStrategy}. */
+public interface PartitionExpireStrategyFactory {
+
+ PartitionExpireStrategy create(
+ CatalogLoader catalogLoader, Identifier identifier, RowType
partitionType);
+
+ Supplier<PartitionExpireStrategyFactory> INSTANCE =
+ Suppliers.memoize(
+ () ->
+ FactoryUtil.discoverSingletonFactory(
+
PartitionExpireStrategy.class.getClassLoader(),
+ PartitionExpireStrategyFactory.class));
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/partition/CustomPartitionExpirationFactory.java
b/paimon-core/src/test/java/org/apache/paimon/partition/CustomPartitionExpirationFactory.java
new file mode 100644
index 0000000000..4f2b770d55
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/partition/CustomPartitionExpirationFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Custom {@link PartitionExpireStrategyFactory}. */
+public class CustomPartitionExpirationFactory implements
PartitionExpireStrategyFactory {
+
+ public static final Map<String, List<PartitionEntry>>
TABLE_EXPIRE_PARTITIONS = new HashMap<>();
+
+ @Override
+ public PartitionExpireStrategy create(
+ CatalogLoader catalogLoader, Identifier identifier, RowType
partitionType) {
+ return new PartitionExpireStrategy(partitionType) {
+ @Override
+ public List<PartitionEntry> selectExpiredPartitions(
+ FileStoreScan scan, LocalDateTime expirationTime) {
+ Table table;
+ try (Catalog catalog = catalogLoader.load()) {
+ table = catalog.getTable(identifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return
TABLE_EXPIRE_PARTITIONS.get(table.options().get("path"));
+ }
+ };
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
new file mode 100644
index 0000000000..c2e1356c15
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_STRATEGY;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
+import static
org.apache.paimon.partition.CustomPartitionExpirationFactory.TABLE_EXPIRE_PARTITIONS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class PartitionExpireTableTest extends TableTestBase {
+
+ @Test
+ public void testCustomExpire() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("pt", DataTypes.INT());
+ schemaBuilder.partitionKeys("pt");
+ schemaBuilder.option(PARTITION_EXPIRATION_STRATEGY.key(), "custom");
+ schemaBuilder.option(PARTITION_EXPIRATION_TIME.key(), "1 d");
+ schemaBuilder.option(END_INPUT_CHECK_PARTITION_EXPIRE.key(), "true");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+ Table table = catalog.getTable(identifier());
+ write(table, GenericRow.of(1, 1));
+ write(table, GenericRow.of(2, 2));
+ assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(1, 1),
GenericRow.of(2, 2));
+
+ String path = table.options().get("path");
+ try {
+ PartitionEntry expire = new
PartitionEntry(BinaryRow.singleColumn(1), 1, 1, 1, 1);
+ TABLE_EXPIRE_PARTITIONS.put(path,
Collections.singletonList(expire));
+ write(table, GenericRow.of(3, 3));
+ assertThat(read(table))
+ .containsExactlyInAnyOrder(GenericRow.of(3, 3),
GenericRow.of(2, 2));
+ } finally {
+ TABLE_EXPIRE_PARTITIONS.remove(path);
+ }
+ }
+}
diff --git
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.partition.PartitionExpireStrategyFactory
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.partition.PartitionExpireStrategyFactory
new file mode 100644
index 0000000000..551d126b38
--- /dev/null
+++
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.partition.PartitionExpireStrategyFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.paimon.partition.CustomPartitionExpirationFactory
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index b025c3a954..cc98303bd5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.TimeUtils;
@@ -63,7 +64,10 @@ public class ExpirePartitionsAction extends TableActionBase {
TimeUtils.parseDuration(expirationTime),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()));
+ CoreOptions.fromMap(map),
+ fileStore.partitionType(),
+ catalogLoader(),
+ new Identifier(databaseName, tableName)));
}
@Override