This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e570dc26 [core] support to drop partition when delete tag (#3042)
0e570dc26 is described below

commit 0e570dc26719f2b5d9757fd7b1be7230eb7475c4
Author: Aitozi <[email protected]>
AuthorDate: Wed Mar 20 13:50:56 2024 +0800

    [core] support to drop partition when delete tag (#3042)
---
 .../paimon/metastore/AddPartitionTagCallback.java  | 11 +++++++++
 .../apache/paimon/metastore/MetastoreClient.java   |  2 ++
 .../paimon/table/AbstractFileStoreTable.java       |  7 +++++-
 .../org/apache/paimon/table/sink/TagCallback.java  |  2 ++
 .../org/apache/paimon/tag/TagAutoCreation.java     |  5 ++++-
 .../java/org/apache/paimon/utils/TagManager.java   | 20 ++++++++++++++---
 .../apache/paimon/operation/FileDeletionTest.java  |  6 +++--
 .../operation/UncleanedFileStoreExpireTest.java    |  6 ++++-
 .../sink/AutoTagForSavepointCommitterOperator.java |  2 +-
 .../flink/sink/BatchWriteGeneratorTagOperator.java | 12 +++++++---
 .../sink/BatchWriteGeneratorTagOperatorTest.java   |  1 +
 .../apache/paimon/hive/HiveMetastoreClient.java    | 14 ++++++++++++
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 26 ++++++++++++++++++++++
 13 files changed, 102 insertions(+), 12 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
index 33f5ed5a9..70efe68e8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
@@ -44,6 +44,17 @@ public class AddPartitionTagCallback implements TagCallback {
         }
     }
 
+    @Override
+    public void notifyDeletion(String tagName) {
+        LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
+        partitionSpec.put(partitionField, tagName);
+        try {
+            client.deletePartition(partitionSpec);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public void close() throws Exception {
         client.close();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java 
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index 615e78330..9247d4923 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -33,6 +33,8 @@ public interface MetastoreClient extends AutoCloseable {
 
     void addPartition(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
 
+    void deletePartition(LinkedHashMap<String, String> partitionSpec) throws 
Exception;
+
     /** Factory to create {@link MetastoreClient}. */
     interface Factory extends Serializable {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d2c840ba9..ac0f798a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -462,7 +462,12 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public void deleteTag(String tagName) {
-        tagManager().deleteTag(tagName, store().newTagDeletion(), 
snapshotManager());
+        tagManager()
+                .deleteTag(
+                        tagName,
+                        store().newTagDeletion(),
+                        snapshotManager(),
+                        store().createTagCallbacks());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
index 397b341d9..1d20bb89d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
@@ -26,4 +26,6 @@ package org.apache.paimon.table.sink;
 public interface TagCallback extends AutoCloseable {
 
     void notifyCreation(String tagName);
+
+    void notifyDeletion(String tagName);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 31a9b1997..505454313 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -155,7 +155,10 @@ public class TagAutoCreation {
                     int i = 0;
                     for (List<String> tag : tags.values()) {
                         tagManager.deleteTag(
-                                checkAndGetOneAutoTag(tag), tagDeletion, 
snapshotManager);
+                                checkAndGetOneAutoTag(tag),
+                                tagDeletion,
+                                snapshotManager,
+                                callbacks);
                         i++;
                         if (i == toDelete) {
                             break;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 90f690053..134dea459 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -126,7 +126,10 @@ public class TagManager {
     }
 
     public void deleteTag(
-            String tagName, TagDeletion tagDeletion, SnapshotManager 
snapshotManager) {
+            String tagName,
+            TagDeletion tagDeletion,
+            SnapshotManager snapshotManager,
+            List<TagCallback> callbacks) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
         checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
 
@@ -135,12 +138,12 @@ public class TagManager {
 
         // skip file deletion if snapshot exists
         if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
-            fileIO.deleteQuietly(tagPath(tagName));
+            deleteTagMetaFile(tagName, callbacks);
             return;
         } else {
             // FileIO discovers tags by tag file, so we should read all tags 
before we delete tag
             SortedMap<Snapshot, List<String>> tags = tags();
-            fileIO.deleteQuietly(tagPath(tagName));
+            deleteTagMetaFile(tagName, callbacks);
 
             // skip data file clean if more than 1 tags are created based on 
this snapshot
             if (tags.get(taggedSnapshot).size() > 1) {
@@ -152,6 +155,17 @@ public class TagManager {
         doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
     }
 
+    private void deleteTagMetaFile(String tagName, List<TagCallback> 
callbacks) {
+        fileIO.deleteQuietly(tagPath(tagName));
+        try {
+            callbacks.forEach(callback -> callback.notifyDeletion(tagName));
+        } finally {
+            for (TagCallback tagCallback : callbacks) {
+                IOUtils.closeQuietly(tagCallback);
+            }
+        }
+    }
+
     private void doClean(
             Snapshot taggedSnapshot,
             List<Snapshot> taggedSnapshots,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 9994c0809..ec73eb317 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -425,7 +425,8 @@ public class FileDeletionTest {
             assertPathExists(fileIO, 
pathFactory.toManifestListPath(manifestListName));
         }
 
-        tagManager.deleteTag("tag1", store.newTagDeletion(), snapshotManager);
+        tagManager.deleteTag(
+                "tag1", store.newTagDeletion(), snapshotManager, 
Collections.emptyList());
 
         // check data files
         assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
@@ -501,7 +502,8 @@ public class FileDeletionTest {
             assertPathExists(fileIO, 
pathFactory.toManifestListPath(manifestListName));
         }
 
-        tagManager.deleteTag("tag2", store.newTagDeletion(), snapshotManager);
+        tagManager.deleteTag(
+                "tag2", store.newTagDeletion(), snapshotManager, 
Collections.emptyList());
 
         // check data files
         assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index ce93166a5..9f5ccb81c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -109,7 +109,11 @@ public class UncleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         // randomly delete tags
         for (int id = 1; id <= latestSnapshotId; id++) {
             if (random.nextBoolean()) {
-                tagManager.deleteTag("tag" + id, store.newTagDeletion(), 
snapshotManager);
+                tagManager.deleteTag(
+                        "tag" + id,
+                        store.newTagDeletion(),
+                        snapshotManager,
+                        Collections.emptyList());
             }
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
index da3425e9b..dcf2c8b00 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -156,7 +156,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
         identifiersForTags.remove(checkpointId);
         String tagName = SAVEPOINT_TAG_PREFIX + checkpointId;
         if (tagManager.tagExists(tagName)) {
-            tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+            tagManager.deleteTag(tagName, tagDeletion, snapshotManager, 
callbacks);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index d65ab7414..2c898831e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -114,7 +114,8 @@ public class BatchWriteGeneratorTagOperator<CommitT, 
GlobalCommitT>
         try {
             // If the tag already exists, delete the tag
             if (tagManager.tagExists(tagName)) {
-                tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+                tagManager.deleteTag(
+                        tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks());
             }
             // Create a new tag
             tagManager.createTag(snapshot, tagName, 
table.store().createTagCallbacks());
@@ -122,7 +123,8 @@ public class BatchWriteGeneratorTagOperator<CommitT, 
GlobalCommitT>
             expireTag();
         } catch (Exception e) {
             if (tagManager.tagExists(tagName)) {
-                tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+                tagManager.deleteTag(
+                        tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks());
             }
         }
     }
@@ -147,7 +149,11 @@ public class BatchWriteGeneratorTagOperator<CommitT, 
GlobalCommitT>
                     } else {
                         List<String> sortedTagNames = 
tagManager.sortTagsOfOneSnapshot(tagNames);
                         for (String toBeDeleted : sortedTagNames) {
-                            tagManager.deleteTag(toBeDeleted, tagDeletion, 
snapshotManager);
+                            tagManager.deleteTag(
+                                    toBeDeleted,
+                                    tagDeletion,
+                                    snapshotManager,
+                                    table.store().createTagCallbacks());
                             tagCount--;
                             if (tagCount == tagNumRetainedMax) {
                                 break;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index f020f65bd..d6d7f434b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -63,6 +63,7 @@ public class BatchWriteGeneratorTagOperatorTest extends 
CommitterOperatorTest {
                                 () ->
                                         new VersionedSerializerWrapper<>(
                                                 new 
ManifestCommittableSerializer())));
+        committerOperator.open();
 
         TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index d24944b34..031b1848a 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -91,6 +91,20 @@ public class HiveMetastoreClient implements MetastoreClient {
         }
     }
 
+    @Override
+    public void deletePartition(LinkedHashMap<String, String> partitionSpec) 
throws Exception {
+        List<String> partitionValues = new ArrayList<>(partitionSpec.values());
+        try {
+            client.dropPartition(
+                    identifier.getDatabaseName(),
+                    identifier.getObjectName(),
+                    partitionValues,
+                    false);
+        } catch (NoSuchObjectException e) {
+            // do nothing if the partition not exists
+        }
+    }
+
     @Override
     public void close() throws Exception {
         client.close();
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 80d6c5460..aacd9087c 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -933,6 +933,32 @@ public abstract class HiveCatalogITCaseBase {
                         "4\t40\t2023-10-17");
     }
 
+    @Test
+    public void testDeletePartitionForTag() throws Exception {
+        tEnv.executeSql(
+                "CREATE TABLE t (\n"
+                        + "    k INT,\n"
+                        + "    v BIGINT,\n"
+                        + "    PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "    'bucket' = '2',\n"
+                        + "    'metastore.tag-to-partition' = 'dt'\n"
+                        + ")");
+        tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
+        tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await();
+        tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
+
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17");
+
+        tEnv.executeSql("CALL sys.delete_tag('test_db.t', '2023-10-16')");
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+                .containsExactlyInAnyOrder("dt=2023-10-17");
+
+        assertThat(hiveShell.executeQuery("SELECT k, v FROM t WHERE 
dt='2023-10-16'")).isEmpty();
+    }
+
     @Test
     public void testHistoryPartitionsCascadeToUpdate() throws Exception {
         tEnv.executeSql(

Reply via email to