This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b49f2553b [hive] Partition is dropped from metastore only when all 
branches do not contain this partition (#3867)
b49f2553b is described below

commit b49f2553bbebf8945c7558eba9582323cfba6c7d
Author: tsreaper <[email protected]>
AuthorDate: Mon Aug 5 18:51:31 2024 +0800

    [hive] Partition is dropped from metastore only when all branches do not 
contain this partition (#3867)
---
 .../java/org/apache/paimon/branch/TableBranch.java | 66 ----------------
 .../apache/paimon/table/system/BranchesTable.java  | 87 +++++++++++++++++++---
 .../org/apache/paimon/utils/BranchManager.java     | 56 +-------------
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 39 +++-------
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 66 ++++++++++++----
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 53 ++++++++++++-
 6 files changed, 196 insertions(+), 171 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java 
b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
deleted file mode 100644
index 9b5b478fc..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.branch;
-
-/** {@link TableBranch} has branch relevant information for table. */
-public class TableBranch {
-    private final String branchName;
-    private final String createdFromTag;
-    private final Long createdFromSnapshot;
-
-    private final long createTime;
-
-    public TableBranch(String branchName, Long createdFromSnapshot, long 
createTime) {
-        this.branchName = branchName;
-        this.createdFromTag = null;
-        this.createdFromSnapshot = createdFromSnapshot;
-        this.createTime = createTime;
-    }
-
-    public TableBranch(String branchName, long createTime) {
-        this.branchName = branchName;
-        this.createdFromTag = null;
-        this.createdFromSnapshot = null;
-        this.createTime = createTime;
-    }
-
-    public TableBranch(
-            String branchName, String createdFromTag, Long 
createdFromSnapshot, long createTime) {
-        this.branchName = branchName;
-        this.createdFromTag = createdFromTag;
-        this.createdFromSnapshot = createdFromSnapshot;
-        this.createTime = createTime;
-    }
-
-    public String getBranchName() {
-        return branchName;
-    }
-
-    public String getCreatedFromTag() {
-        return createdFromTag;
-    }
-
-    public Long getCreatedFromSnapshot() {
-        return createdFromSnapshot;
-    }
-
-    public long getCreateTime() {
-        return createTime;
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 2e3f5e90a..ec1608d95 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.branch.TableBranch;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -28,6 +28,8 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.ReadonlyTable;
@@ -42,21 +44,33 @@ import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+import static org.apache.paimon.utils.BranchManager.BRANCH_PREFIX;
+import static org.apache.paimon.utils.BranchManager.branchPath;
+import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** A {@link Table} for showing branches of table. */
 public class BranchesTable implements ReadonlyTable {
@@ -189,25 +203,78 @@ public class BranchesTable implements ReadonlyTable {
             if (!(split instanceof BranchesSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
+
             Path location = ((BranchesSplit) split).location;
             FileStoreTable table = FileStoreTableFactory.create(fileIO, 
location);
-            List<TableBranch> branches = table.branchManager().branches();
-            Iterator<InternalRow> rows = 
Iterators.transform(branches.iterator(), this::toRow);
+            Iterator<InternalRow> rows;
+            try {
+                rows = branches(table).iterator();
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+
             if (projection != null) {
                 rows =
                         Iterators.transform(
                                 rows, row -> 
ProjectedRow.from(projection).replaceRow(row));
             }
+
             return new IteratorRecordReader<>(rows);
         }
 
-        private InternalRow toRow(TableBranch branch) {
-            return GenericRow.of(
-                    BinaryString.fromString(branch.getBranchName()),
-                    BinaryString.fromString(branch.getCreatedFromTag()),
-                    branch.getCreatedFromSnapshot(),
-                    Timestamp.fromLocalDateTime(
-                            
DateTimeUtils.toLocalDateTime(branch.getCreateTime())));
+        private List<InternalRow> branches(FileStoreTable table) throws 
IOException {
+            BranchManager branchManager = table.branchManager();
+            SchemaManager schemaManager = new SchemaManager(fileIO, 
table.location());
+
+            List<Pair<Path, Long>> paths =
+                    listVersionedDirectories(fileIO, 
branchManager.branchDirectory(), BRANCH_PREFIX)
+                            .map(status -> Pair.of(status.getPath(), 
status.getModificationTime()))
+                            .collect(Collectors.toList());
+            List<InternalRow> result = new ArrayList<>();
+
+            for (Pair<Path, Long> path : paths) {
+                String branchName = 
path.getLeft().getName().substring(BRANCH_PREFIX.length());
+                String basedTag = null;
+                Long basedSnapshotId = null;
+                long creationTime = path.getRight();
+
+                Optional<TableSchema> tableSchema =
+                        schemaManager.copyWithBranch(branchName).latest();
+                if (tableSchema.isPresent()) {
+                    FileStoreTable branchTable =
+                            FileStoreTableFactory.create(
+                                    fileIO, new 
Path(branchPath(table.location(), branchName)));
+                    SortedMap<Snapshot, List<String>> snapshotTags =
+                            branchTable.tagManager().tags();
+                    Long earliestSnapshotId = 
branchTable.snapshotManager().earliestSnapshotId();
+                    if (snapshotTags.isEmpty()) {
+                        // create based on snapshotId
+                        basedSnapshotId = earliestSnapshotId;
+                    } else {
+                        Snapshot snapshot = snapshotTags.firstKey();
+                        if (Objects.equals(earliestSnapshotId, snapshot.id())) 
{
+                            // create based on tag
+                            List<String> tags = snapshotTags.get(snapshot);
+                            checkArgument(tags.size() == 1);
+                            basedTag = tags.get(0);
+                            basedSnapshotId = snapshot.id();
+                        } else {
+                            // create based on snapshotId
+                            basedSnapshotId = earliestSnapshotId;
+                        }
+                    }
+                }
+
+                result.add(
+                        GenericRow.of(
+                                BinaryString.fromString(branchName),
+                                BinaryString.fromString(basedTag),
+                                basedSnapshotId,
+                                Timestamp.fromLocalDateTime(
+                                        
DateTimeUtils.toLocalDateTime(creationTime))));
+            }
+
+            return result;
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 6ff8d4c2a..af598587c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -19,25 +19,17 @@
 package org.apache.paimon.utils;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.branch.TableBranch;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
-import java.util.Optional;
-import java.util.PriorityQueue;
-import java.util.SortedMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -315,51 +307,11 @@ public class BranchManager {
     }
 
     /** Get all branches for the table. */
-    public List<TableBranch> branches() {
+    public List<String> branches() {
         try {
-            List<Pair<Path, Long>> paths =
-                    listVersionedDirectories(fileIO, branchDirectory(), 
BRANCH_PREFIX)
-                            .map(status -> Pair.of(status.getPath(), 
status.getModificationTime()))
-                            .collect(Collectors.toList());
-            PriorityQueue<TableBranch> pq =
-                    new 
PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime));
-            for (Pair<Path, Long> path : paths) {
-                String branchName = 
path.getLeft().getName().substring(BRANCH_PREFIX.length());
-                Optional<TableSchema> tableSchema =
-                        schemaManager.copyWithBranch(branchName).latest();
-                if (!tableSchema.isPresent()) {
-                    // Support empty branch.
-                    pq.add(new TableBranch(branchName, path.getValue()));
-                    continue;
-                }
-                FileStoreTable branchTable =
-                        FileStoreTableFactory.create(
-                                fileIO, new Path(branchPath(tablePath, 
branchName)));
-                SortedMap<Snapshot, List<String>> snapshotTags = 
branchTable.tagManager().tags();
-                Long earliestSnapshotId = 
branchTable.snapshotManager().earliestSnapshotId();
-                if (snapshotTags.isEmpty()) {
-                    // Create based on snapshotId.
-                    pq.add(new TableBranch(branchName, earliestSnapshotId, 
path.getValue()));
-                } else {
-                    Snapshot snapshot = snapshotTags.firstKey();
-                    if (earliestSnapshotId == snapshot.id()) {
-                        List<String> tags = snapshotTags.get(snapshot);
-                        checkArgument(tags.size() == 1);
-                        pq.add(
-                                new TableBranch(
-                                        branchName, tags.get(0), 
snapshot.id(), path.getValue()));
-                    } else {
-                        // Create based on snapshotId.
-                        pq.add(new TableBranch(branchName, earliestSnapshotId, 
path.getValue()));
-                    }
-                }
-            }
-
-            List<TableBranch> branches = new ArrayList<>(pq.size());
-            while (!pq.isEmpty()) {
-                branches.add(pq.poll());
-            }
-            return branches;
+            return listVersionedDirectories(fileIO, branchDirectory(), 
BRANCH_PREFIX)
+                    .map(status -> 
status.getPath().getName().substring(BRANCH_PREFIX.length()))
+                    .collect(Collectors.toList());
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index d005dc4e5..33aca03b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink;
 
-import org.apache.paimon.branch.TableBranch;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -137,8 +135,7 @@ public class BranchSqlITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testCreateBranchFromSnapshot() throws 
Catalog.TableNotExistException {
-
+    public void testCreateBranchFromSnapshot() throws Exception {
         sql(
                 "CREATE TABLE T ("
                         + " pt INT"
@@ -158,12 +155,8 @@ public class BranchSqlITCase extends CatalogITCaseBase {
         sql("CALL sys.create_branch('default.T', 'test', 1)");
         sql("CALL sys.create_branch('default.T', 'test2', 2)");
 
-        FileStoreTable table = paimonTable("T");
-
-        assertThat(
-                        table.branchManager().branches().stream()
-                                .map(TableBranch::getCreatedFromSnapshot))
-                .containsExactlyInAnyOrder(1L, 2L);
+        assertThat(collectResult("SELECT created_from_snapshot FROM 
`T$branches`"))
+                .containsExactlyInAnyOrder("+I[1]", "+I[2]");
 
         
assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1))
                 .isEqualTo(true);
@@ -223,25 +216,17 @@ public class BranchSqlITCase extends CatalogITCaseBase {
         sql("CALL sys.create_branch('default.T', 'test', 1)");
         sql("CALL sys.create_branch('default.T', 'test2', 2)");
 
-        FileStoreTable table = paimonTable("T");
-
-        assertThat(
-                        table.branchManager().branches().stream()
-                                .map(TableBranch::getCreatedFromSnapshot))
-                .containsExactlyInAnyOrder(1L, 2L);
-
-        
assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
-                .containsExactlyInAnyOrder("test", "test2");
+        assertThat(collectResult("SELECT branch_name, created_from_snapshot 
FROM `T$branches`"))
+                .containsExactlyInAnyOrder("+I[test, 1]", "+I[test2, 2]");
 
         sql("CALL sys.delete_branch('default.T', 'test')");
 
-        
assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
-                .containsExactlyInAnyOrder("test2");
+        assertThat(collectResult("SELECT branch_name, created_from_snapshot 
FROM `T$branches`"))
+                .containsExactlyInAnyOrder("+I[test2, 2]");
     }
 
     @Test
-    public void testBranchManagerGetBranchSnapshotsList()
-            throws Catalog.TableNotExistException, IOException {
+    public void testBranchManagerGetBranchSnapshotsList() throws Exception {
         sql(
                 "CREATE TABLE T ("
                         + " pt INT"
@@ -263,10 +248,8 @@ public class BranchSqlITCase extends CatalogITCaseBase {
         sql("CALL sys.create_branch('default.T', 'test2', 2)");
         sql("CALL sys.create_branch('default.T', 'test3', 3)");
 
-        assertThat(
-                        table.branchManager().branches().stream()
-                                .map(TableBranch::getCreatedFromSnapshot))
-                .containsExactlyInAnyOrder(1L, 2L, 3L);
+        assertThat(collectResult("SELECT created_from_snapshot FROM 
`T$branches`"))
+                .containsExactlyInAnyOrder("+I[1]", "+I[2]", "+I[3]");
     }
 
     @Test
@@ -370,7 +353,7 @@ public class BranchSqlITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testDifferentRowTypes() throws Exception {
+    public void testDifferentRowTypes() {
         sql(
                 "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) 
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
         sql("CALL sys.create_branch('default.t', 'pk')");
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 7156fbba2..c5473446b 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -40,6 +40,8 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -168,11 +170,13 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier 
identifier) {
+        Identifier tableIdentifier =
+                new Identifier(identifier.getDatabaseName(), 
identifier.getTableName());
         try {
             return Optional.of(
                     new HiveMetastoreClient.Factory(
-                            identifier,
-                            getDataTableSchema(identifier),
+                            tableIdentifier,
+                            getDataTableSchema(tableIdentifier),
                             hiveConf,
                             clientClassName,
                             options));
@@ -284,18 +288,33 @@ public class HiveCatalog extends AbstractCatalog {
         }
     }
 
+    private Map<String, String> convertToProperties(Database database) {
+        Map<String, String> properties = new 
HashMap<>(database.getParameters());
+        if (database.getLocationUri() != null) {
+            properties.put(DB_LOCATION_PROP, database.getLocationUri());
+        }
+        if (database.getDescription() != null) {
+            properties.put(COMMENT_PROP, database.getDescription());
+        }
+        return properties;
+    }
+
     @Override
     public void dropPartition(Identifier identifier, Map<String, String> 
partitionSpec)
             throws TableNotExistException {
         TableSchema tableSchema = getDataTableSchema(identifier);
         if (!tableSchema.partitionKeys().isEmpty()
-                && new 
CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
-
+                && new 
CoreOptions(tableSchema.options()).partitionedTableInMetastore()
+                && !partitionExistsInOtherBranches(identifier, partitionSpec)) 
{
             try {
                 // Do not close client, it is for HiveCatalog
                 @SuppressWarnings("resource")
                 HiveMetastoreClient metastoreClient =
-                        new HiveMetastoreClient(identifier, tableSchema, 
clients);
+                        new HiveMetastoreClient(
+                                new Identifier(
+                                        identifier.getDatabaseName(), 
identifier.getTableName()),
+                                tableSchema,
+                                clients);
                 metastoreClient.deletePartition(new 
LinkedHashMap<>(partitionSpec));
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -304,15 +323,36 @@ public class HiveCatalog extends AbstractCatalog {
         super.dropPartition(identifier, partitionSpec);
     }
 
-    private Map<String, String> convertToProperties(Database database) {
-        Map<String, String> properties = new 
HashMap<>(database.getParameters());
-        if (database.getLocationUri() != null) {
-            properties.put(DB_LOCATION_PROP, database.getLocationUri());
-        }
-        if (database.getDescription() != null) {
-            properties.put(COMMENT_PROP, database.getDescription());
+    private boolean partitionExistsInOtherBranches(
+            Identifier identifier, Map<String, String> partitionSpec)
+            throws TableNotExistException {
+        FileStoreTable mainTable =
+                (FileStoreTable)
+                        getTable(
+                                new Identifier(
+                                        identifier.getDatabaseName(), 
identifier.getTableName()));
+        List<String> branchNames = new 
ArrayList<>(mainTable.branchManager().branches());
+        branchNames.add(DEFAULT_MAIN_BRANCH);
+
+        for (String branchName : branchNames) {
+            if (branchName.equals(identifier.getBranchNameOrDefault())) {
+                continue;
+            }
+
+            Optional<TableSchema> branchSchema =
+                    tableSchemaInFileSystem(mainTable.location(), branchName);
+            if (!branchSchema.isPresent()) {
+                continue;
+            }
+
+            FileStoreTable table =
+                    FileStoreTableFactory.create(
+                            mainTable.fileIO(), mainTable.location(), 
branchSchema.get());
+            if 
(!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty())
 {
+                return true;
+            }
         }
-        return properties;
+        return false;
     }
 
     @Override
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index e0e19b9ba..3dd8cb251 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -485,11 +485,58 @@ public abstract class HiveCatalogITCaseBase {
                 .containsExactlyInAnyOrder("3\tx3", "3\tx33", "4\tx4");
     }
 
+    @Test
+    public void testDropPartitionFromBranch() throws Exception {
+        testDropPartitionFromBranchImpl();
+    }
+
+    @Test
+    @LocationInProperties
+    public void testDropPartitionFromBranchLocationInProperties() throws 
Exception {
+        testDropPartitionFromBranchImpl();
+    }
+
+    private void testDropPartitionFromBranchImpl() throws Exception {
+        tEnv.executeSql(
+                        "CREATE TABLE t ( pt INT, v STRING ) PARTITIONED BY 
(pt) "
+                                + "WITH ( 'file.format' = 'avro', 
'metastore.partitioned-table' = 'true' )")
+                .await();
+        tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
+
+        tEnv.executeSql("INSERT INTO t VALUES (1, 'apple'), (2, 'banana'), (4, 
'mango')").await();
+        tEnv.executeSql("INSERT INTO `t$branch_test` VALUES (1, 'cat'), (3, 
'dog'), (4, 'lion')")
+                .await();
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4");
+
+        tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 1)");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=3", "pt=4");
+
+        tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 3)");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("pt=1", "pt=2", "pt=4");
+
+        tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 1)");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("pt=2", "pt=4");
+
+        tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 4)");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("pt=2", "pt=4");
+
+        tEnv.executeSql("ALTER TABLE `t$branch_test` DROP PARTITION (pt = 4)");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS 
t")).containsExactlyInAnyOrder("pt=2");
+
+        tEnv.executeSql("ALTER TABLE t DROP PARTITION (pt = 2)");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")).isEmpty();
+    }
+
     @Test
     public void testFallbackBranchRead() throws Exception {
         tEnv.executeSql(
                         "CREATE TABLE t ( pt INT, a INT, b STRING ) 
PARTITIONED BY (pt) "
-                                + "WITH ( 'file.format' = 'avro' )")
+                                + "WITH ( 'file.format' = 'avro', 
'metastore.partitioned-table' = 'true' )")
                 .await();
         tEnv.executeSql("CALL sys.create_branch('test_db.t', 'test')").await();
         tEnv.executeSql(
@@ -510,9 +557,11 @@ public abstract class HiveCatalogITCaseBase {
                         Row.of(1, 20, "banana"),
                         Row.of(2, 10, "lion"),
                         Row.of(2, 20, "wolf"));
-        assertThat(hiveShell.executeQuery("SELECT * FROM t"))
+        assertThat(hiveShell.executeQuery("SELECT pt, a, b FROM t"))
                 .containsExactlyInAnyOrder(
                         "1\t10\tapple", "1\t20\tbanana", "2\t10\tlion", 
"2\t20\twolf");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("pt=1", "pt=2");
     }
 
     /**

Reply via email to