This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8791c5d98 [hive] Fix flink not synchronizing to hive metastore when
deleting table partitions using hive catalog (#3411)
8791c5d98 is described below
commit 8791c5d9817d2c38d6f3422a4aba799ebf403846
Author: Kerwin <[email protected]>
AuthorDate: Wed May 29 10:34:34 2024 +0800
[hive] Fix flink not synchronizing to hive metastore when deleting table
partitions using hive catalog (#3411)
---
.../apache/paimon/table/PrimaryKeyTableUtils.java | 8 +-------
.../java/org/apache/paimon/hive/HiveCatalog.java | 20 ++++++++++++++++++++
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 13 +++++++++++++
3 files changed, 34 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 572e488c6..b3dbbdd29 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -45,13 +45,7 @@ public class PrimaryKeyTableUtils {
public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
return keyFields.stream()
- .map(
- f ->
- new DataField(
- f.id(),
- KEY_FIELD_PREFIX + f.name(),
- f.type(),
- f.description()))
+ .map(f -> f.newName(KEY_FIELD_PREFIX + f.name()))
.collect(Collectors.toList());
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 25cb8784d..6a1bb9d0a 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -73,6 +73,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -268,6 +269,25 @@ public class HiveCatalog extends AbstractCatalog {
}
}
+ @Override
+ public void dropPartition(Identifier identifier, Map<String, String>
partitionSpec)
+ throws TableNotExistException {
+ TableSchema tableSchema = getDataTableSchema(identifier);
+ if (!tableSchema.partitionKeys().isEmpty()
+ && new
CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
+ try {
+ // Do not close client, it is for HiveCatalog
+ @SuppressWarnings("resource")
+ HiveMetastoreClient metastoreClient =
+ new HiveMetastoreClient(identifier, tableSchema,
client);
+ metastoreClient.deletePartition(new
LinkedHashMap<>(partitionSpec));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.dropPartition(identifier, partitionSpec);
+ }
+
private Map<String, String> convertToProperties(Database database) {
Map<String, String> properties = new
HashMap<>(database.getParameters());
if (database.getLocationUri() != null) {
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index ac140d949..cfff7f38c 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -970,6 +970,19 @@ public abstract class HiveCatalogITCaseBase {
.containsExactlyInAnyOrder("1\t10", "2\t20");
}
+ @Test
+ public void testDropPartitionsToMetastore() throws Exception {
+ prepareTestAddPartitionsToMetastore();
+
+ // drop partition
+ tEnv.executeSql(
+ "ALTER TABLE t DROP PARTITION (ptb = '1a', pta = 1),
PARTITION (ptb = '1b', pta = 1)")
+ .await();
+ assertThat(hiveShell.executeQuery("show partitions t"))
+ .containsExactlyInAnyOrder(
+ "ptb=2a/pta=2", "ptb=2b/pta=2", "ptb=3a/pta=3",
"ptb=3b/pta=3");
+ }
+
@Test
public void testAddPartitionsForTag() throws Exception {
tEnv.executeSql(