This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit cfcc72d7d86e36a0a505023d500a663f23f6dfe0 Author: Kerwin <[email protected]> AuthorDate: Wed May 29 10:34:34 2024 +0800 [hive] Fix flink not synchronizing to hive metastore when deleting table partitions using hive catalog (#3411) --- .../apache/paimon/table/PrimaryKeyTableUtils.java | 8 +------- .../java/org/apache/paimon/hive/HiveCatalog.java | 20 ++++++++++++++++++++ .../apache/paimon/hive/HiveCatalogITCaseBase.java | 13 +++++++++++++ 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java index 572e488c6..b3dbbdd29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java @@ -45,13 +45,7 @@ public class PrimaryKeyTableUtils { public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) { return keyFields.stream() - .map( - f -> - new DataField( - f.id(), - KEY_FIELD_PREFIX + f.name(), - f.type(), - f.description())) + .map(f -> f.newName(KEY_FIELD_PREFIX + f.name())) .collect(Collectors.toList()); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 0d71e70a6..7da826e3f 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -72,6 +72,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -267,6 +268,25 @@ public class HiveCatalog extends AbstractCatalog { } } + @Override + public void dropPartition(Identifier identifier, Map<String, String> partitionSpec) + throws TableNotExistException { + TableSchema tableSchema = getDataTableSchema(identifier); + if (!tableSchema.partitionKeys().isEmpty() + && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { + try { + // Do not close client, it is for HiveCatalog + @SuppressWarnings("resource") + HiveMetastoreClient metastoreClient = + new HiveMetastoreClient(identifier, tableSchema, client); + metastoreClient.deletePartition(new LinkedHashMap<>(partitionSpec)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + super.dropPartition(identifier, partitionSpec); + } + private Map<String, String> convertToProperties(Database database) { Map<String, String> properties = new HashMap<>(database.getParameters()); if (database.getLocationUri() != null) { diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index e478d44cd..4e6fba9e7 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -950,6 +950,19 @@ public abstract class HiveCatalogITCaseBase { .containsExactlyInAnyOrder("1\t10", "2\t20"); } + @Test + public void testDropPartitionsToMetastore() throws Exception { + prepareTestAddPartitionsToMetastore(); + + // drop partition + tEnv.executeSql( + "ALTER TABLE t DROP PARTITION (ptb = '1a', pta = 1), PARTITION (ptb = '1b', pta = 1)") + .await(); + assertThat(hiveShell.executeQuery("show partitions t")) + .containsExactlyInAnyOrder( + "ptb=2a/pta=2", "ptb=2b/pta=2", "ptb=3a/pta=3", "ptb=3b/pta=3"); + } + @Test public void testAddPartitionsForTag() throws Exception { tEnv.executeSql(
