This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 72bdfbf5c [core] Fix the expired partition sync to metastore (#3534)
72bdfbf5c is described below
commit 72bdfbf5ca717be9bfac59ccf9d3a7b63d784c69
Author: gsralex <[email protected]>
AuthorDate: Mon Jun 24 13:00:31 2024 +0800
[core] Fix the expired partition sync to metastore (#3534)
---
.../java/org/apache/paimon/AbstractFileStore.java | 10 +++++++-
.../apache/paimon/operation/PartitionExpire.java | 24 ++++++++++++++++-
.../flink/procedure/ExpirePartitionsProcedure.java | 10 +++++++-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 30 ++++++++++++++++++++++
.../spark/procedure/ExpirePartitionsProcedure.java | 10 +++++++-
5 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index ca8b90994..0ca4ad68c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -255,6 +255,13 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
return null;
}
+ MetastoreClient.Factory metastoreClientFactory =
+ catalogEnvironment.metastoreClientFactory();
+ MetastoreClient metastoreClient = null;
+ if (options.partitionedTableInMetastore() && metastoreClientFactory !=
null) {
+ metastoreClient = metastoreClientFactory.create();
+ }
+
return new PartitionExpire(
partitionType(),
partitionExpireTime,
@@ -262,7 +269,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.partitionTimestampPattern(),
options.partitionTimestampFormatter(),
newScan(),
- newCommit(commitUser));
+ newCommit(commitUser),
+ metastoreClient);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index f99ff57af..4eb352bce 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.types.RowType;
@@ -30,6 +31,8 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -50,6 +53,7 @@ public class PartitionExpire {
private final PartitionTimeExtractor timeExtractor;
private final FileStoreScan scan;
private final FileStoreCommit commit;
+ private final MetastoreClient metastoreClient;
private LocalDateTime lastCheck;
@@ -60,7 +64,8 @@ public class PartitionExpire {
String timePattern,
String timeFormatter,
FileStoreScan scan,
- FileStoreCommit commit) {
+ FileStoreCommit commit,
+ @Nullable MetastoreClient metastoreClient) {
this.partitionKeys = partitionType.getFieldNames();
this.toObjectArrayConverter = new
RowDataToObjectArrayConverter(partitionType);
this.expirationTime = expirationTime;
@@ -68,6 +73,7 @@ public class PartitionExpire {
this.timeExtractor = new PartitionTimeExtractor(timePattern,
timeFormatter);
this.scan = scan;
this.commit = commit;
+ this.metastoreClient = metastoreClient;
this.lastCheck = LocalDateTime.now();
}
@@ -103,6 +109,22 @@ public class PartitionExpire {
}
if (expired.size() > 0) {
commit.dropPartitions(expired, commitIdentifier);
+ if (metastoreClient != null) {
+ deleteMetastorePartitions(expired);
+ }
+ }
+ }
+
+ private void deleteMetastorePartitions(List<Map<String, String>>
partitions) {
+ if (metastoreClient != null) {
+ partitions.forEach(
+ partition -> {
+ try {
+ metastoreClient.deletePartition(new
LinkedHashMap<>(partition));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 6e21b3522..a114d12a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.TimeUtils;
@@ -30,6 +31,7 @@ import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import java.time.Duration;
+import java.util.Optional;
/** A procedure to expire partitions. */
public class ExpirePartitionsProcedure extends ProcedureBase {
@@ -60,7 +62,13 @@ public class ExpirePartitionsProcedure extends ProcedureBase
{
null,
timestampFormatter,
fileStore.newScan(),
- fileStore.newCommit(""));
+ fileStore.newCommit(""),
+ Optional.ofNullable(
+ fileStoreTable
+ .catalogEnvironment()
+ .metastoreClientFactory())
+ .map(MetastoreClient.Factory::create)
+ .orElse(null));
partitionExpire.expire(Long.MAX_VALUE);
return new String[] {};
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index cfff7f38c..a73510df4 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -1246,6 +1246,36 @@ public abstract class HiveCatalogITCaseBase {
.containsExactlyInAnyOrder("dt=2020-01-02/hh=09",
"dt=2020-01-03/hh=10");
}
+ @Test
+ public void testExpiredPartitionsSyncToMetastore() throws Exception {
+ // Use flink to create a partitioned table and write data, hive read.
+ tEnv.executeSql("drop table if exists students").await();
+ tEnv.executeSql(
+ "create table students\n"
+ + "(id string\n"
+ + ",dt string\n"
+ + ",PRIMARY KEY(id,dt) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt)\n"
+ + "WITH (\n"
+ + "'bucket' = '-1',\n"
+ + "'file.format' = 'parquet',\n"
+ + "'metastore.partitioned-table' = 'true'\n"
+ + ");")
+ .await();
+
+ tEnv.executeSql("insert into students values('1',
'2024-06-15')").await();
+ tEnv.executeSql("insert into students values('1',
'9998-06-15')").await();
+ tEnv.executeSql("insert into students values('1',
'9999-06-15')").await();
+
+ assertThat(hiveShell.executeQuery("show partitions students"))
+ .containsExactlyInAnyOrder("dt=2024-06-15", "dt=9998-06-15",
"dt=9999-06-15");
+ tEnv.executeSql(
+ "CALL sys.expire_partitions(`table` =>
'test_db.students', expiration_time => '1 d', timestamp_formatter =>
'yyyy-MM-dd')")
+ .await();
+ assertThat(hiveShell.executeQuery("show partitions students"))
+ .containsExactlyInAnyOrder("dt=9998-06-15", "dt=9999-06-15");
+ }
+
/** Prepare to update a paimon table with a custom path in the paimon file
system. */
private void alterTableInFileSystem(TableEnvironment tEnv) throws
Exception {
tEnv.executeSql(
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index b41c37181..16089937f 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure;
import org.apache.paimon.FileStore;
+import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.TimeUtils;
@@ -32,6 +33,7 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.time.Duration;
+import java.util.Optional;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -83,7 +85,13 @@ public class ExpirePartitionsProcedure extends BaseProcedure
{
null,
timestampFormatter,
fileStore.newScan(),
- fileStore.newCommit(""));
+ fileStore.newCommit(""),
+ Optional.ofNullable(
+ fileStoreTable
+
.catalogEnvironment()
+
.metastoreClientFactory())
+
.map(MetastoreClient.Factory::create)
+ .orElse(null));
partitionExpire.expire(Long.MAX_VALUE);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};