This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0e570dc26 [core] support to drop partition when delete tag (#3042)
0e570dc26 is described below
commit 0e570dc26719f2b5d9757fd7b1be7230eb7475c4
Author: Aitozi <[email protected]>
AuthorDate: Wed Mar 20 13:50:56 2024 +0800
[core] support to drop partition when delete tag (#3042)
---
.../paimon/metastore/AddPartitionTagCallback.java | 11 +++++++++
.../apache/paimon/metastore/MetastoreClient.java | 2 ++
.../paimon/table/AbstractFileStoreTable.java | 7 +++++-
.../org/apache/paimon/table/sink/TagCallback.java | 2 ++
.../org/apache/paimon/tag/TagAutoCreation.java | 5 ++++-
.../java/org/apache/paimon/utils/TagManager.java | 20 ++++++++++++++---
.../apache/paimon/operation/FileDeletionTest.java | 6 +++--
.../operation/UncleanedFileStoreExpireTest.java | 6 ++++-
.../sink/AutoTagForSavepointCommitterOperator.java | 2 +-
.../flink/sink/BatchWriteGeneratorTagOperator.java | 12 +++++++---
.../sink/BatchWriteGeneratorTagOperatorTest.java | 1 +
.../apache/paimon/hive/HiveMetastoreClient.java | 14 ++++++++++++
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 26 ++++++++++++++++++++++
13 files changed, 102 insertions(+), 12 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
index 33f5ed5a9..70efe68e8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java
@@ -44,6 +44,17 @@ public class AddPartitionTagCallback implements TagCallback {
}
}
+ @Override
+ public void notifyDeletion(String tagName) {
+ LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
+ partitionSpec.put(partitionField, tagName);
+ try {
+ client.deletePartition(partitionSpec);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public void close() throws Exception {
client.close();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
index 615e78330..9247d4923 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java
@@ -33,6 +33,8 @@ public interface MetastoreClient extends AutoCloseable {
void addPartition(LinkedHashMap<String, String> partitionSpec) throws
Exception;
+ void deletePartition(LinkedHashMap<String, String> partitionSpec) throws
Exception;
+
/** Factory to create {@link MetastoreClient}. */
interface Factory extends Serializable {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index d2c840ba9..ac0f798a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -462,7 +462,12 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public void deleteTag(String tagName) {
- tagManager().deleteTag(tagName, store().newTagDeletion(),
snapshotManager());
+ tagManager()
+ .deleteTag(
+ tagName,
+ store().newTagDeletion(),
+ snapshotManager(),
+ store().createTagCallbacks());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
index 397b341d9..1d20bb89d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
@@ -26,4 +26,6 @@ package org.apache.paimon.table.sink;
public interface TagCallback extends AutoCloseable {
void notifyCreation(String tagName);
+
+ void notifyDeletion(String tagName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 31a9b1997..505454313 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -155,7 +155,10 @@ public class TagAutoCreation {
int i = 0;
for (List<String> tag : tags.values()) {
tagManager.deleteTag(
- checkAndGetOneAutoTag(tag), tagDeletion,
snapshotManager);
+ checkAndGetOneAutoTag(tag),
+ tagDeletion,
+ snapshotManager,
+ callbacks);
i++;
if (i == toDelete) {
break;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 90f690053..134dea459 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -126,7 +126,10 @@ public class TagManager {
}
public void deleteTag(
- String tagName, TagDeletion tagDeletion, SnapshotManager
snapshotManager) {
+ String tagName,
+ TagDeletion tagDeletion,
+ SnapshotManager snapshotManager,
+ List<TagCallback> callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is
blank.", tagName);
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
@@ -135,12 +138,12 @@ public class TagManager {
// skip file deletion if snapshot exists
if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
- fileIO.deleteQuietly(tagPath(tagName));
+ deleteTagMetaFile(tagName, callbacks);
return;
} else {
// FileIO discovers tags by tag file, so we should read all tags
before we delete tag
SortedMap<Snapshot, List<String>> tags = tags();
- fileIO.deleteQuietly(tagPath(tagName));
+ deleteTagMetaFile(tagName, callbacks);
// skip data file clean if more than 1 tags are created based on
this snapshot
if (tags.get(taggedSnapshot).size() > 1) {
@@ -152,6 +155,17 @@ public class TagManager {
doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
}
+ private void deleteTagMetaFile(String tagName, List<TagCallback>
callbacks) {
+ fileIO.deleteQuietly(tagPath(tagName));
+ try {
+ callbacks.forEach(callback -> callback.notifyDeletion(tagName));
+ } finally {
+ for (TagCallback tagCallback : callbacks) {
+ IOUtils.closeQuietly(tagCallback);
+ }
+ }
+ }
+
private void doClean(
Snapshot taggedSnapshot,
List<Snapshot> taggedSnapshots,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 9994c0809..ec73eb317 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -425,7 +425,8 @@ public class FileDeletionTest {
assertPathExists(fileIO,
pathFactory.toManifestListPath(manifestListName));
}
- tagManager.deleteTag("tag1", store.newTagDeletion(), snapshotManager);
+ tagManager.deleteTag(
+ "tag1", store.newTagDeletion(), snapshotManager,
Collections.emptyList());
// check data files
assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
@@ -501,7 +502,8 @@ public class FileDeletionTest {
assertPathExists(fileIO,
pathFactory.toManifestListPath(manifestListName));
}
- tagManager.deleteTag("tag2", store.newTagDeletion(), snapshotManager);
+ tagManager.deleteTag(
+ "tag2", store.newTagDeletion(), snapshotManager,
Collections.emptyList());
// check data files
assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index ce93166a5..9f5ccb81c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -109,7 +109,11 @@ public class UncleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
// randomly delete tags
for (int id = 1; id <= latestSnapshotId; id++) {
if (random.nextBoolean()) {
- tagManager.deleteTag("tag" + id, store.newTagDeletion(),
snapshotManager);
+ tagManager.deleteTag(
+ "tag" + id,
+ store.newTagDeletion(),
+ snapshotManager,
+ Collections.emptyList());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
index da3425e9b..dcf2c8b00 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -156,7 +156,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT,
GlobalCommitT>
identifiersForTags.remove(checkpointId);
String tagName = SAVEPOINT_TAG_PREFIX + checkpointId;
if (tagManager.tagExists(tagName)) {
- tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+ tagManager.deleteTag(tagName, tagDeletion, snapshotManager,
callbacks);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index d65ab7414..2c898831e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -114,7 +114,8 @@ public class BatchWriteGeneratorTagOperator<CommitT,
GlobalCommitT>
try {
// If the tag already exists, delete the tag
if (tagManager.tagExists(tagName)) {
- tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+ tagManager.deleteTag(
+ tagName, tagDeletion, snapshotManager,
table.store().createTagCallbacks());
}
// Create a new tag
tagManager.createTag(snapshot, tagName,
table.store().createTagCallbacks());
@@ -122,7 +123,8 @@ public class BatchWriteGeneratorTagOperator<CommitT,
GlobalCommitT>
expireTag();
} catch (Exception e) {
if (tagManager.tagExists(tagName)) {
- tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+ tagManager.deleteTag(
+ tagName, tagDeletion, snapshotManager,
table.store().createTagCallbacks());
}
}
}
@@ -147,7 +149,11 @@ public class BatchWriteGeneratorTagOperator<CommitT,
GlobalCommitT>
} else {
List<String> sortedTagNames =
tagManager.sortTagsOfOneSnapshot(tagNames);
for (String toBeDeleted : sortedTagNames) {
- tagManager.deleteTag(toBeDeleted, tagDeletion,
snapshotManager);
+ tagManager.deleteTag(
+ toBeDeleted,
+ tagDeletion,
+ snapshotManager,
+ table.store().createTagCallbacks());
tagCount--;
if (tagCount == tagNumRetainedMax) {
break;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index f020f65bd..d6d7f434b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -63,6 +63,7 @@ public class BatchWriteGeneratorTagOperatorTest extends
CommitterOperatorTest {
() ->
new VersionedSerializerWrapper<>(
new
ManifestCommittableSerializer())));
+ committerOperator.open();
TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index d24944b34..031b1848a 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -91,6 +91,20 @@ public class HiveMetastoreClient implements MetastoreClient {
}
}
+ @Override
+ public void deletePartition(LinkedHashMap<String, String> partitionSpec)
throws Exception {
+ List<String> partitionValues = new ArrayList<>(partitionSpec.values());
+ try {
+ client.dropPartition(
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ partitionValues,
+ false);
+ } catch (NoSuchObjectException e) {
+ // do nothing if the partition not exists
+ }
+ }
+
@Override
public void close() throws Exception {
client.close();
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 80d6c5460..aacd9087c 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -933,6 +933,32 @@ public abstract class HiveCatalogITCaseBase {
"4\t40\t2023-10-17");
}
+ @Test
+ public void testDeletePartitionForTag() throws Exception {
+ tEnv.executeSql(
+ "CREATE TABLE t (\n"
+ + " k INT,\n"
+ + " v BIGINT,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'bucket' = '2',\n"
+ + " 'metastore.tag-to-partition' = 'dt'\n"
+ + ")");
+ tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
+ tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
+ tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await();
+ tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");
+
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+ .containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17");
+
+ tEnv.executeSql("CALL sys.delete_tag('test_db.t', '2023-10-16')");
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
+ .containsExactlyInAnyOrder("dt=2023-10-17");
+
+ assertThat(hiveShell.executeQuery("SELECT k, v FROM t WHERE
dt='2023-10-16'")).isEmpty();
+ }
+
@Test
public void testHistoryPartitionsCascadeToUpdate() throws Exception {
tEnv.executeSql(