This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f4d524e5ec [core] Introduce custom PartitionExpireStrategy (#5625)
f4d524e5ec is described below

commit f4d524e5ec6ab0eb2dd7572be6a08702cb73b81c
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 19 19:37:15 2025 +0800

    [core] Introduce custom PartitionExpireStrategy (#5625)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  4 +-
 .../org/apache/paimon/factories/FactoryUtil.java   | 33 +++++++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |  7 ++-
 .../paimon/partition/PartitionExpireStrategy.java  | 17 +++++-
 .../partition/PartitionExpireStrategyFactory.java  | 41 +++++++++++++
 .../CustomPartitionExpirationFactory.java          | 56 ++++++++++++++++++
 .../paimon/partition/PartitionExpireTableTest.java | 68 ++++++++++++++++++++++
 ...paimon.partition.PartitionExpireStrategyFactory | 19 ++++++
 .../flink/action/ExpirePartitionsAction.java       |  6 +-
 10 files changed, 247 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index c2d91ea775..0de4bf5794 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -699,7 +699,7 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><h5>partition.expiration-strategy</h5></td>
             <td style="word-wrap: break-word;">values-time</td>
             <td><p>Enum</p></td>
-            <td>The strategy determines how to extract the partition time and 
compare it with the current time.<br /><br />Possible 
values:<ul><li>"values-time": This strategy compares the time extracted from 
the partition value with the current time.</li><li>"update-time": This strategy 
compares the last update time of the partition with the current 
time.</li></ul></td>
+            <td>The strategy determines how to extract the partition time and 
compare it with the current time.<br /><br />Possible 
values:<ul><li>"values-time": This strategy compares the time extracted from 
the partition value with the current time.</li><li>"update-time": This strategy 
compares the last update time of the partition with the current 
time.</li><li>"custom": This strategy use custom class to expire 
partitions.</li></ul></td>
         </tr>
         <tr>
             <td><h5>partition.expiration-time</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 56cb707742..28e7acda81 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -3248,7 +3248,9 @@ public class CoreOptions implements Serializable {
 
         UPDATE_TIME(
                 "update-time",
-                "This strategy compares the last update time of the partition 
with the current time.");
+                "This strategy compares the last update time of the partition 
with the current time."),
+
+        CUSTOM("custom", "This strategy use custom class to expire 
partitions.");
 
         private final String value;
 
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java 
b/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java
index a492aeece7..8431c77872 100644
--- a/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java
+++ b/paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java
@@ -140,4 +140,37 @@ public class FactoryUtil {
 
         return loadResults;
     }
+
+    /**
+     * Discover a singleton factory.
+     *
+     * @param classLoader the class loader
+     * @param klass the klass
+     * @param <T> the type of the factory
+     * @return the factory
+     */
+    public static <T> T discoverSingletonFactory(ClassLoader classLoader, 
Class<T> klass) {
+        List<T> factories = FactoryUtil.discoverFactories(classLoader, klass);
+        if (factories.isEmpty()) {
+            throw new FactoryException(
+                    String.format(
+                            "Could not find any factories that implement '%s' 
in the classpath.",
+                            klass.getName()));
+        }
+
+        if (factories.size() > 1) {
+            throw new FactoryException(
+                    String.format(
+                            "Multiple factories that implement '%s' found in 
the classpath.\n\n"
+                                    + "Ambiguous factory classes are:\n\n"
+                                    + "%s",
+                            klass.getName(),
+                            factories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        return factories.get(0);
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 929d760b0b..65e35e4345 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -79,6 +79,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import static 
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -423,7 +424,11 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 table,
                 partitionExpireTime,
                 options.partitionExpireCheckInterval(),
-                PartitionExpireStrategy.createPartitionExpireStrategy(options, 
partitionType()));
+                createPartitionExpireStrategy(
+                        options,
+                        partitionType(),
+                        catalogEnvironment.catalogLoader(),
+                        catalogEnvironment.identifier()));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
index 2def5b43a9..7d016c0931 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java
@@ -19,12 +19,16 @@
 package org.apache.paimon.partition;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
+import javax.annotation.Nullable;
+
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -66,13 +70,22 @@ public abstract class PartitionExpireStrategy {
             FileStoreScan scan, LocalDateTime expirationTime);
 
     public static PartitionExpireStrategy createPartitionExpireStrategy(
-            CoreOptions options, RowType partitionType) {
+            CoreOptions options,
+            RowType partitionType,
+            @Nullable CatalogLoader catalogLoader,
+            @Nullable Identifier identifier) {
         switch (options.partitionExpireStrategy()) {
             case UPDATE_TIME:
                 return new PartitionUpdateTimeExpireStrategy(partitionType);
             case VALUES_TIME:
-            default:
                 return new PartitionValuesTimeExpireStrategy(options, 
partitionType);
+            case CUSTOM:
+                return PartitionExpireStrategyFactory.INSTANCE
+                        .get()
+                        .create(catalogLoader, identifier, partitionType);
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown partitionExpireStrategy: " + 
options.partitionExpireStrategy());
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategyFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategyFactory.java
new file mode 100644
index 0000000000..6e89e98725
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategyFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.base.Supplier;
+import org.apache.paimon.shade.guava30.com.google.common.base.Suppliers;
+
+/** Factory to create a {@link PartitionExpireStrategy}. */
+public interface PartitionExpireStrategyFactory {
+
+    PartitionExpireStrategy create(
+            CatalogLoader catalogLoader, Identifier identifier, RowType 
partitionType);
+
+    Supplier<PartitionExpireStrategyFactory> INSTANCE =
+            Suppliers.memoize(
+                    () ->
+                            FactoryUtil.discoverSingletonFactory(
+                                    
PartitionExpireStrategy.class.getClassLoader(),
+                                    PartitionExpireStrategyFactory.class));
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/partition/CustomPartitionExpirationFactory.java
 
b/paimon-core/src/test/java/org/apache/paimon/partition/CustomPartitionExpirationFactory.java
new file mode 100644
index 0000000000..4f2b770d55
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/partition/CustomPartitionExpirationFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.RowType;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Custom {@link PartitionExpireStrategyFactory}. */
+public class CustomPartitionExpirationFactory implements 
PartitionExpireStrategyFactory {
+
+    public static final Map<String, List<PartitionEntry>> 
TABLE_EXPIRE_PARTITIONS = new HashMap<>();
+
+    @Override
+    public PartitionExpireStrategy create(
+            CatalogLoader catalogLoader, Identifier identifier, RowType 
partitionType) {
+        return new PartitionExpireStrategy(partitionType) {
+            @Override
+            public List<PartitionEntry> selectExpiredPartitions(
+                    FileStoreScan scan, LocalDateTime expirationTime) {
+                Table table;
+                try (Catalog catalog = catalogLoader.load()) {
+                    table = catalog.getTable(identifier);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                return 
TABLE_EXPIRE_PARTITIONS.get(table.options().get("path"));
+            }
+        };
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
new file mode 100644
index 0000000000..c2e1356c15
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionExpireTableTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_STRATEGY;
+import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
+import static 
org.apache.paimon.partition.CustomPartitionExpirationFactory.TABLE_EXPIRE_PARTITIONS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class PartitionExpireTableTest extends TableTestBase {
+
+    @Test
+    public void testCustomExpire() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("pt", DataTypes.INT());
+        schemaBuilder.partitionKeys("pt");
+        schemaBuilder.option(PARTITION_EXPIRATION_STRATEGY.key(), "custom");
+        schemaBuilder.option(PARTITION_EXPIRATION_TIME.key(), "1 d");
+        schemaBuilder.option(END_INPUT_CHECK_PARTITION_EXPIRE.key(), "true");
+        catalog.createTable(identifier(), schemaBuilder.build(), true);
+
+        Table table = catalog.getTable(identifier());
+        write(table, GenericRow.of(1, 1));
+        write(table, GenericRow.of(2, 2));
+        assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(1, 1), 
GenericRow.of(2, 2));
+
+        String path = table.options().get("path");
+        try {
+            PartitionEntry expire = new 
PartitionEntry(BinaryRow.singleColumn(1), 1, 1, 1, 1);
+            TABLE_EXPIRE_PARTITIONS.put(path, 
Collections.singletonList(expire));
+            write(table, GenericRow.of(3, 3));
+            assertThat(read(table))
+                    .containsExactlyInAnyOrder(GenericRow.of(3, 3), 
GenericRow.of(2, 2));
+        } finally {
+            TABLE_EXPIRE_PARTITIONS.remove(path);
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.partition.PartitionExpireStrategyFactory
 
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.partition.PartitionExpireStrategyFactory
new file mode 100644
index 0000000000..551d126b38
--- /dev/null
+++ 
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.partition.PartitionExpireStrategyFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.paimon.partition.CustomPartitionExpirationFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index b025c3a954..cc98303bd5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.TimeUtils;
@@ -63,7 +64,10 @@ public class ExpirePartitionsAction extends TableActionBase {
                         TimeUtils.parseDuration(expirationTime),
                         Duration.ofMillis(0L),
                         createPartitionExpireStrategy(
-                                CoreOptions.fromMap(map), 
fileStore.partitionType()));
+                                CoreOptions.fromMap(map),
+                                fileStore.partitionType(),
+                                catalogLoader(),
+                                new Identifier(databaseName, tableName)));
     }
 
     @Override

Reply via email to