This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 72bdfbf5c [core] Fix the expired partition sync to metastore (#3534)
72bdfbf5c is described below

commit 72bdfbf5ca717be9bfac59ccf9d3a7b63d784c69
Author: gsralex <[email protected]>
AuthorDate: Mon Jun 24 13:00:31 2024 +0800

    [core] Fix the expired partition sync to metastore (#3534)
---
 .../java/org/apache/paimon/AbstractFileStore.java  | 10 +++++++-
 .../apache/paimon/operation/PartitionExpire.java   | 24 ++++++++++++++++-
 .../flink/procedure/ExpirePartitionsProcedure.java | 10 +++++++-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 30 ++++++++++++++++++++++
 .../spark/procedure/ExpirePartitionsProcedure.java | 10 +++++++-
 5 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index ca8b90994..0ca4ad68c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -255,6 +255,13 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
             return null;
         }
 
+        MetastoreClient.Factory metastoreClientFactory =
+                catalogEnvironment.metastoreClientFactory();
+        MetastoreClient metastoreClient = null;
+        if (options.partitionedTableInMetastore() && metastoreClientFactory != 
null) {
+            metastoreClient = metastoreClientFactory.create();
+        }
+
         return new PartitionExpire(
                 partitionType(),
                 partitionExpireTime,
@@ -262,7 +269,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.partitionTimestampPattern(),
                 options.partitionTimestampFormatter(),
                 newScan(),
-                newCommit(commitUser));
+                newCommit(commitUser),
+                metastoreClient);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index f99ff57af..4eb352bce 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.types.RowType;
@@ -30,6 +31,8 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -50,6 +53,7 @@ public class PartitionExpire {
     private final PartitionTimeExtractor timeExtractor;
     private final FileStoreScan scan;
     private final FileStoreCommit commit;
+    private final MetastoreClient metastoreClient;
 
     private LocalDateTime lastCheck;
 
@@ -60,7 +64,8 @@ public class PartitionExpire {
             String timePattern,
             String timeFormatter,
             FileStoreScan scan,
-            FileStoreCommit commit) {
+            FileStoreCommit commit,
+            @Nullable MetastoreClient metastoreClient) {
         this.partitionKeys = partitionType.getFieldNames();
         this.toObjectArrayConverter = new 
RowDataToObjectArrayConverter(partitionType);
         this.expirationTime = expirationTime;
@@ -68,6 +73,7 @@ public class PartitionExpire {
         this.timeExtractor = new PartitionTimeExtractor(timePattern, 
timeFormatter);
         this.scan = scan;
         this.commit = commit;
+        this.metastoreClient = metastoreClient;
         this.lastCheck = LocalDateTime.now();
     }
 
@@ -103,6 +109,22 @@ public class PartitionExpire {
         }
         if (expired.size() > 0) {
             commit.dropPartitions(expired, commitIdentifier);
+            if (metastoreClient != null) {
+                deleteMetastorePartitions(expired);
+            }
+        }
+    }
+
+    private void deleteMetastorePartitions(List<Map<String, String>> 
partitions) {
+        if (metastoreClient != null) {
+            partitions.forEach(
+                    partition -> {
+                        try {
+                            metastoreClient.deletePartition(new 
LinkedHashMap<>(partition));
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 6e21b3522..a114d12a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.FileStore;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.TimeUtils;
@@ -30,6 +31,7 @@ import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.time.Duration;
+import java.util.Optional;
 
 /** A procedure to expire partitions. */
 public class ExpirePartitionsProcedure extends ProcedureBase {
@@ -60,7 +62,13 @@ public class ExpirePartitionsProcedure extends ProcedureBase 
{
                         null,
                         timestampFormatter,
                         fileStore.newScan(),
-                        fileStore.newCommit(""));
+                        fileStore.newCommit(""),
+                        Optional.ofNullable(
+                                        fileStoreTable
+                                                .catalogEnvironment()
+                                                .metastoreClientFactory())
+                                .map(MetastoreClient.Factory::create)
+                                .orElse(null));
         partitionExpire.expire(Long.MAX_VALUE);
         return new String[] {};
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index cfff7f38c..a73510df4 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -1246,6 +1246,36 @@ public abstract class HiveCatalogITCaseBase {
                 .containsExactlyInAnyOrder("dt=2020-01-02/hh=09", 
"dt=2020-01-03/hh=10");
     }
 
+    @Test
+    public void testExpiredPartitionsSyncToMetastore() throws Exception {
+        // Use flink to create a partitioned table and write data, hive read.
+        tEnv.executeSql("drop table if exists students").await();
+        tEnv.executeSql(
+                        "create table students\n"
+                                + "(id string\n"
+                                + ",dt string\n"
+                                + ",PRIMARY KEY(id,dt) NOT ENFORCED\n"
+                                + ") PARTITIONED BY (dt)\n"
+                                + "WITH (\n"
+                                + "'bucket' = '-1',\n"
+                                + "'file.format' = 'parquet',\n"
+                                + "'metastore.partitioned-table' = 'true'\n"
+                                + ");")
+                .await();
+
+        tEnv.executeSql("insert into students values('1', 
'2024-06-15')").await();
+        tEnv.executeSql("insert into students values('1', 
'9998-06-15')").await();
+        tEnv.executeSql("insert into students values('1', 
'9999-06-15')").await();
+
+        assertThat(hiveShell.executeQuery("show partitions students"))
+                .containsExactlyInAnyOrder("dt=2024-06-15", "dt=9998-06-15", 
"dt=9999-06-15");
+        tEnv.executeSql(
+                        "CALL sys.expire_partitions(`table` => 
'test_db.students', expiration_time => '1 d', timestamp_formatter => 
'yyyy-MM-dd')")
+                .await();
+        assertThat(hiveShell.executeQuery("show partitions students"))
+                .containsExactlyInAnyOrder("dt=9998-06-15", "dt=9999-06-15");
+    }
+
     /** Prepare to update a paimon table with a custom path in the paimon file 
system. */
     private void alterTableInFileSystem(TableEnvironment tEnv) throws 
Exception {
         tEnv.executeSql(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index b41c37181..16089937f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.procedure;
 
 import org.apache.paimon.FileStore;
+import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.TimeUtils;
@@ -32,6 +33,7 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 import java.time.Duration;
+import java.util.Optional;
 
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -83,7 +85,13 @@ public class ExpirePartitionsProcedure extends BaseProcedure 
{
                                     null,
                                     timestampFormatter,
                                     fileStore.newScan(),
-                                    fileStore.newCommit(""));
+                                    fileStore.newCommit(""),
+                                    Optional.ofNullable(
+                                                    fileStoreTable
+                                                            
.catalogEnvironment()
+                                                            
.metastoreClientFactory())
+                                            
.map(MetastoreClient.Factory::create)
+                                            .orElse(null));
                     partitionExpire.expire(Long.MAX_VALUE);
                     InternalRow outputRow = newInternalRow(true);
                     return new InternalRow[] {outputRow};

Reply via email to