This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ad8ce4068 [iceberg] fix that dropPartition cannot sychronize the 
metadata to iceberg (#5209)
8ad8ce4068 is described below

commit 8ad8ce4068eb25654dc2b3d602ff372c1931ce60
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Mar 5 12:28:12 2025 +0800

    [iceberg] fix that dropPartition cannot sychronize the metadata to iceberg 
(#5209)
---
 .../java/org/apache/paimon/AbstractFileStore.java  | 48 +++++++++++++---
 .../org/apache/paimon/AppendOnlyFileStore.java     | 19 +++++++
 .../src/main/java/org/apache/paimon/FileStore.java |  3 -
 .../java/org/apache/paimon/KeyValueFileStore.java  | 19 +++++++
 .../paimon/privilege/PrivilegedFileStore.java      |  7 ---
 .../paimon/table/AbstractFileStoreTable.java       | 46 +---------------
 .../paimon/table/AppendOnlyFileStoreTable.java     | 21 +------
 .../paimon/table/PrimaryKeyFileStoreTable.java     | 20 +------
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 64 ++++++++++++++++++++++
 .../IcebergHiveMetadataCommitterITCaseBase.java    | 14 +++++
 10 files changed, 162 insertions(+), 99 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 4e404aa86c..0ee0d667b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -30,7 +30,9 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.metastore.AddPartitionCommitCallback;
 import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.TagPreviewCommitCallback;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreScan;
@@ -54,9 +56,11 @@ import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.SuccessFileTagCallback;
 import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -86,7 +90,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
     protected final String tableName;
     protected final CoreOptions options;
     protected final RowType partitionType;
-    private final CatalogEnvironment catalogEnvironment;
+    protected final CatalogEnvironment catalogEnvironment;
 
     @Nullable private final SegmentsCache<Path> writeManifestCache;
     @Nullable private SegmentsCache<Path> readManifestCache;
@@ -279,11 +283,6 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @Override
     public FileStoreCommitImpl newCommit(String commitUser) {
-        return newCommit(commitUser, Collections.emptyList());
-    }
-
-    @Override
-    public FileStoreCommitImpl newCommit(String commitUser, 
List<CommitCallback> callbacks) {
         SnapshotManager snapshotManager = snapshotManager();
         SnapshotCommit snapshotCommit = 
catalogEnvironment.snapshotCommit(snapshotManager);
         if (snapshotCommit == null) {
@@ -314,7 +313,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 newStatsFileHandler(),
                 bucketMode(),
                 options.scanManifestParallelism(),
-                callbacks,
+                createCommitCallbacks(commitUser),
                 options.commitMaxRetries(),
                 options.commitTimeout());
     }
@@ -366,6 +365,41 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     public abstract Comparator<InternalRow> newKeyComparator();
 
+    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+        List<CommitCallback> callbacks =
+                new ArrayList<>(CallbackUtils.loadCommitCallbacks(options));
+
+        if (options.partitionedTableInMetastore() && 
!schema.partitionKeys().isEmpty()) {
+            PartitionHandler partitionHandler = 
catalogEnvironment.partitionHandler();
+            if (partitionHandler != null) {
+                InternalRowPartitionComputer partitionComputer =
+                        new InternalRowPartitionComputer(
+                                options.partitionDefaultName(),
+                                schema.logicalPartitionType(),
+                                schema.partitionKeys().toArray(new String[0]),
+                                options.legacyPartitionName());
+                callbacks.add(new AddPartitionCommitCallback(partitionHandler, 
partitionComputer));
+            }
+        }
+
+        TagPreview tagPreview = TagPreview.create(options);
+        if (options.tagToPartitionField() != null
+                && tagPreview != null
+                && schema.partitionKeys().isEmpty()) {
+            PartitionHandler partitionHandler = 
catalogEnvironment.partitionHandler();
+            if (partitionHandler != null) {
+                TagPreviewCommitCallback callback =
+                        new TagPreviewCommitCallback(
+                                new AddPartitionTagCallback(
+                                        partitionHandler, 
options.tagToPartitionField()),
+                                tagPreview);
+                callbacks.add(callback);
+            }
+        }
+
+        return callbacks;
+    }
+
     @Override
     @Nullable
     public PartitionExpire newPartitionExpire(String commitUser) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index dc5171a744..4ee81feae7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,6 +22,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback;
+import org.apache.paimon.iceberg.IcebergOptions;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -32,8 +34,10 @@ import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.types.RowType;
 
 import java.util.Comparator;
@@ -167,4 +171,19 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     public Comparator<InternalRow> newKeyComparator() {
         return null;
     }
+
+    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+        List<CommitCallback> callbacks = 
super.createCommitCallbacks(commitUser);
+
+        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+                != IcebergOptions.StorageType.DISABLED) {
+            callbacks.add(
+                    new AppendOnlyIcebergCommitCallback(
+                            new AppendOnlyFileStoreTable(
+                                    fileIO, pathFactory().root(), schema, 
catalogEnvironment),
+                            commitUser));
+        }
+
+        return callbacks;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 6cd170f4e6..76306eacd7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -35,7 +35,6 @@ import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
@@ -90,8 +89,6 @@ public interface FileStore<T> {
 
     FileStoreCommit newCommit(String commitUser);
 
-    FileStoreCommit newCommit(String commitUser, List<CommitCallback> 
callbacks);
-
     SnapshotDeletion newSnapshotDeletion();
 
     ChangelogDeletion newChangelogDeletion();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 18316901bf..3e18446496 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -23,6 +23,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
 import org.apache.paimon.index.HashIndexMaintainer;
 import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
@@ -41,6 +43,8 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
+import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -261,4 +265,19 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     public Comparator<InternalRow> newKeyComparator() {
         return keyComparatorSupplier.get();
     }
+
+    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+        List<CommitCallback> callbacks = 
super.createCommitCallbacks(commitUser);
+
+        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+                != IcebergOptions.StorageType.DISABLED) {
+            callbacks.add(
+                    new PrimaryKeyIcebergCommitCallback(
+                            new PrimaryKeyFileStoreTable(
+                                    fileIO, pathFactory().root(), schema, 
catalogEnvironment),
+                            commitUser));
+        }
+
+        return callbacks;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index e00b2a398c..ed899ba833 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -39,7 +39,6 @@ import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
@@ -156,12 +155,6 @@ public class PrivilegedFileStore<T> implements 
FileStore<T> {
         return wrapped.newCommit(commitUser);
     }
 
-    @Override
-    public FileStoreCommit newCommit(String commitUser, List<CommitCallback> 
callbacks) {
-        privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newCommit(commitUser, callbacks);
-    }
-
     @Override
     public SnapshotDeletion newSnapshotDeletion() {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index ebeccd24ae..1771a5998f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -27,9 +27,6 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.metastore.AddPartitionCommitCallback;
-import org.apache.paimon.metastore.AddPartitionTagCallback;
-import org.apache.paimon.metastore.TagPreviewCommitCallback;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.ExpireConfig;
@@ -39,8 +36,6 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaValidation;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.Statistics;
-import org.apache.paimon.table.sink.CallbackUtils;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketWriteSelector;
@@ -58,12 +53,10 @@ import 
org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
-import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.CatalogBranchManager;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileSystemBranchManager;
-import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SimpleFileReader;
@@ -78,7 +71,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -463,7 +455,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         }
 
         return new TableCommitImpl(
-                store().newCommit(commitUser, 
createCommitCallbacks(commitUser)),
+                store().newCommit(commitUser),
                 snapshotExpire,
                 options.writeOnly() ? null : 
store().newPartitionExpire(commitUser),
                 options.writeOnly() ? null : store().newTagCreationManager(),
@@ -474,42 +466,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 options.forceCreatingSnapshot());
     }
 
-    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
-        List<CommitCallback> callbacks =
-                new 
ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions()));
-        CoreOptions options = coreOptions();
-
-        if (options.partitionedTableInMetastore() && 
!tableSchema.partitionKeys().isEmpty()) {
-            PartitionHandler partitionHandler = 
catalogEnvironment.partitionHandler();
-            if (partitionHandler != null) {
-                InternalRowPartitionComputer partitionComputer =
-                        new InternalRowPartitionComputer(
-                                options.partitionDefaultName(),
-                                tableSchema.logicalPartitionType(),
-                                tableSchema.partitionKeys().toArray(new 
String[0]),
-                                options.legacyPartitionName());
-                callbacks.add(new AddPartitionCommitCallback(partitionHandler, 
partitionComputer));
-            }
-        }
-
-        TagPreview tagPreview = TagPreview.create(options);
-        if (options.tagToPartitionField() != null
-                && tagPreview != null
-                && tableSchema.partitionKeys().isEmpty()) {
-            PartitionHandler partitionHandler = 
catalogEnvironment.partitionHandler();
-            if (partitionHandler != null) {
-                TagPreviewCommitCallback callback =
-                        new TagPreviewCommitCallback(
-                                new AddPartitionTagCallback(
-                                        partitionHandler, 
options.tagToPartitionField()),
-                                tagPreview);
-                callbacks.add(callback);
-            }
-        }
-
-        return callbacks;
-    }
-
     private Optional<TableSchema> tryTimeTravel(Options options) {
         CoreOptions coreOptions = new CoreOptions(options);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 103fa64050..3448a4ddd4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -23,8 +23,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback;
-import org.apache.paimon.iceberg.IcebergOptions;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -34,7 +32,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.AbstractDataTableRead;
 import org.apache.paimon.table.source.AppendOnlySplitGenerator;
@@ -47,11 +44,10 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.function.BiConsumer;
 
 /** {@link FileStoreTable} for append table. */
-class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
+public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
@@ -61,7 +57,7 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable 
{
         this(fileIO, path, tableSchema, CatalogEnvironment.empty());
     }
 
-    AppendOnlyFileStoreTable(
+    public AppendOnlyFileStoreTable(
             FileIO fileIO,
             Path path,
             TableSchema tableSchema,
@@ -160,17 +156,4 @@ class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     public LocalTableQuery newLocalTableQuery() {
         throw new UnsupportedOperationException();
     }
-
-    @Override
-    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
-        List<CommitCallback> callbacks = 
super.createCommitCallbacks(commitUser);
-        CoreOptions options = coreOptions();
-
-        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
-                != IcebergOptions.StorageType.DISABLED) {
-            callbacks.add(new AppendOnlyIcebergCommitCallback(this, 
commitUser));
-        }
-
-        return callbacks;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index ea71204dc9..835405c513 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -23,8 +23,6 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.iceberg.IcebergOptions;
-import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
@@ -34,7 +32,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.KeyValueTableRead;
@@ -50,7 +47,7 @@ import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMap
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
 
 /** {@link FileStoreTable} for primary key table. */
-class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
+public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
@@ -60,7 +57,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable 
{
         this(fileIO, path, tableSchema, CatalogEnvironment.empty());
     }
 
-    PrimaryKeyFileStoreTable(
+    public PrimaryKeyFileStoreTable(
             FileIO fileIO,
             Path path,
             TableSchema tableSchema,
@@ -176,17 +173,4 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
     public LocalTableQuery newLocalTableQuery() {
         return new LocalTableQuery(this);
     }
-
-    @Override
-    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
-        List<CommitCallback> callbacks = 
super.createCommitCallbacks(commitUser);
-        CoreOptions options = coreOptions();
-
-        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
-                != IcebergOptions.StorageType.DISABLED) {
-            callbacks.add(new PrimaryKeyIcebergCommitCallback(this, 
commitUser));
-        }
-
-        return callbacks;
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 934f48e067..8cd37bdd1e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -191,6 +191,70 @@ public class IcebergCompatibilityTest {
         commit.close();
     }
 
+    @Test
+    public void testDropPartition() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(), DataTypes.INT(), 
DataTypes.INT(), DataTypes.INT()
+                        },
+                        new String[] {"pt1", "pt2", "k", "v"});
+
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Arrays.asList("pt1", "pt2"),
+                        Arrays.asList("pt1", "pt2", "k"),
+                        1,
+                        Collections.emptyMap());
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(BinaryString.fromString("20250304"), 15, 1, 
1));
+        write.write(GenericRow.of(BinaryString.fromString("20250304"), 16, 1, 
1));
+        write.write(GenericRow.of(BinaryString.fromString("20250305"), 15, 1, 
1));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(20250304, 15, 1, 1)",
+                        "Record(20250304, 16, 1, 1)",
+                        "Record(20250305, 15, 1, 1)");
+
+        LocalFileIO fileIO = LocalFileIO.create();
+        Path path = new Path(tempDir.toString());
+
+        try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, 
path)) {
+            Identifier paimonIdentifier = Identifier.create("mydb", "t");
+            if (ThreadLocalRandom.current().nextBoolean()) {
+                // delete the second-level partition
+                Map<String, String> partition = new HashMap<>();
+                partition.put("pt1", "20250304");
+                partition.put("pt2", "16");
+                paimonCatalog.dropPartitions(
+                        paimonIdentifier, 
Collections.singletonList(partition));
+
+                assertThat(getIcebergResult())
+                        .containsExactlyInAnyOrder(
+                                "Record(20250304, 15, 1, 1)", 
"Record(20250305, 15, 1, 1)");
+            } else {
+                // delete the first-level partition
+                Map<String, String> partition = new HashMap<>();
+                partition.put("pt1", "20250304");
+                paimonCatalog.dropPartitions(
+                        paimonIdentifier, 
Collections.singletonList(partition));
+
+                assertThat(getIcebergResult())
+                        .containsExactlyInAnyOrder("Record(20250305, 15, 1, 
1)");
+            }
+        }
+
+        write.close();
+        commit.close();
+    }
+
     @Test
     public void testRetryCreateMetadata() throws Exception {
         RowType rowType =
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
index c0c68888b8..baadf9c3de 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
@@ -106,6 +106,12 @@ public abstract class 
IcebergHiveMetadataCommitterITCaseBase {
                         Row.of(2, 2, "elephant")),
                 collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t 
ORDER BY pt, id")));
 
+        // test drop partition
+        tEnv.executeSql("ALTER TABLE my_paimon.test_db.t DROP PARTITION (pt = 
1)").await();
+        Assert.assertEquals(
+                Arrays.asList(Row.of(2, 1, "cat"), Row.of(2, 2, "elephant")),
+                collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t 
ORDER BY pt, id")));
+
         Assert.assertTrue(
                 hiveShell
                         .executeQuery("DESC DATABASE EXTENDED test_db")
@@ -196,6 +202,14 @@ public abstract class 
IcebergHiveMetadataCommitterITCaseBase {
                         tEnv.executeSql(
                                 "SELECT data, id, pt FROM my_iceberg.test_db.t 
WHERE id > 1 ORDER BY pt, id")));
 
+        // test drop partition
+        tEnv.executeSql("ALTER TABLE my_paimon.test_db.t DROP PARTITION (pt = 
2)").await();
+        Assert.assertEquals(
+                Arrays.asList(Row.of("pear", 2, 1), Row.of("cherry", 3, 1)),
+                collect(
+                        tEnv.executeSql(
+                                "SELECT data, id, pt FROM my_iceberg.test_db.t 
WHERE id > 1 ORDER BY pt, id")));
+
         // specify a dedicated hive database and table for paimon iceberg 
commiter
         tEnv.executeSql(
                 "CREATE TABLE my_paimon.test_db.t1 ( pt INT, id INT, data 
STRING ) PARTITIONED BY (pt) WITH "

Reply via email to