This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8ad8ce4068 [iceberg] fix that dropPartition cannot sychronize the
metadata to iceberg (#5209)
8ad8ce4068 is described below
commit 8ad8ce4068eb25654dc2b3d602ff372c1931ce60
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Mar 5 12:28:12 2025 +0800
[iceberg] fix that dropPartition cannot sychronize the metadata to iceberg
(#5209)
---
.../java/org/apache/paimon/AbstractFileStore.java | 48 +++++++++++++---
.../org/apache/paimon/AppendOnlyFileStore.java | 19 +++++++
.../src/main/java/org/apache/paimon/FileStore.java | 3 -
.../java/org/apache/paimon/KeyValueFileStore.java | 19 +++++++
.../paimon/privilege/PrivilegedFileStore.java | 7 ---
.../paimon/table/AbstractFileStoreTable.java | 46 +---------------
.../paimon/table/AppendOnlyFileStoreTable.java | 21 +------
.../paimon/table/PrimaryKeyFileStoreTable.java | 20 +------
.../paimon/iceberg/IcebergCompatibilityTest.java | 64 ++++++++++++++++++++++
.../IcebergHiveMetadataCommitterITCaseBase.java | 14 +++++
10 files changed, 162 insertions(+), 99 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 4e404aa86c..0ee0d667b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -30,7 +30,9 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.metastore.AddPartitionCommitCallback;
import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreScan;
@@ -54,9 +56,11 @@ import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.SuccessFileTagCallback;
import org.apache.paimon.tag.TagAutoManager;
+import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -86,7 +90,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
protected final String tableName;
protected final CoreOptions options;
protected final RowType partitionType;
- private final CatalogEnvironment catalogEnvironment;
+ protected final CatalogEnvironment catalogEnvironment;
@Nullable private final SegmentsCache<Path> writeManifestCache;
@Nullable private SegmentsCache<Path> readManifestCache;
@@ -279,11 +283,6 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
@Override
public FileStoreCommitImpl newCommit(String commitUser) {
- return newCommit(commitUser, Collections.emptyList());
- }
-
- @Override
- public FileStoreCommitImpl newCommit(String commitUser,
List<CommitCallback> callbacks) {
SnapshotManager snapshotManager = snapshotManager();
SnapshotCommit snapshotCommit =
catalogEnvironment.snapshotCommit(snapshotManager);
if (snapshotCommit == null) {
@@ -314,7 +313,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newStatsFileHandler(),
bucketMode(),
options.scanManifestParallelism(),
- callbacks,
+ createCommitCallbacks(commitUser),
options.commitMaxRetries(),
options.commitTimeout());
}
@@ -366,6 +365,41 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
public abstract Comparator<InternalRow> newKeyComparator();
+ protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+ List<CommitCallback> callbacks =
+ new ArrayList<>(CallbackUtils.loadCommitCallbacks(options));
+
+ if (options.partitionedTableInMetastore() &&
!schema.partitionKeys().isEmpty()) {
+ PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
+ if (partitionHandler != null) {
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ options.partitionDefaultName(),
+ schema.logicalPartitionType(),
+ schema.partitionKeys().toArray(new String[0]),
+ options.legacyPartitionName());
+ callbacks.add(new AddPartitionCommitCallback(partitionHandler,
partitionComputer));
+ }
+ }
+
+ TagPreview tagPreview = TagPreview.create(options);
+ if (options.tagToPartitionField() != null
+ && tagPreview != null
+ && schema.partitionKeys().isEmpty()) {
+ PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
+ if (partitionHandler != null) {
+ TagPreviewCommitCallback callback =
+ new TagPreviewCommitCallback(
+ new AddPartitionTagCallback(
+ partitionHandler,
options.tagToPartitionField()),
+ tagPreview);
+ callbacks.add(callback);
+ }
+ }
+
+ return callbacks;
+ }
+
@Override
@Nullable
public PartitionExpire newPartitionExpire(String commitUser) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index dc5171a744..4ee81feae7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,6 +22,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback;
+import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -32,8 +34,10 @@ import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.types.RowType;
import java.util.Comparator;
@@ -167,4 +171,19 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
public Comparator<InternalRow> newKeyComparator() {
return null;
}
+
+ protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+ List<CommitCallback> callbacks =
super.createCommitCallbacks(commitUser);
+
+ if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+ != IcebergOptions.StorageType.DISABLED) {
+ callbacks.add(
+ new AppendOnlyIcebergCommitCallback(
+ new AppendOnlyFileStoreTable(
+ fileIO, pathFactory().root(), schema,
catalogEnvironment),
+ commitUser));
+ }
+
+ return callbacks;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 6cd170f4e6..76306eacd7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -35,7 +35,6 @@ import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
@@ -90,8 +89,6 @@ public interface FileStore<T> {
FileStoreCommit newCommit(String commitUser);
- FileStoreCommit newCommit(String commitUser, List<CommitCallback>
callbacks);
-
SnapshotDeletion newSnapshotDeletion();
ChangelogDeletion newChangelogDeletion();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 18316901bf..3e18446496 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -23,6 +23,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
import org.apache.paimon.index.HashIndexMaintainer;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.KeyValueFileReaderFactory;
@@ -41,6 +43,8 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
+import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -261,4 +265,19 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
public Comparator<InternalRow> newKeyComparator() {
return keyComparatorSupplier.get();
}
+
+ protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+ List<CommitCallback> callbacks =
super.createCommitCallbacks(commitUser);
+
+ if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+ != IcebergOptions.StorageType.DISABLED) {
+ callbacks.add(
+ new PrimaryKeyIcebergCommitCallback(
+ new PrimaryKeyFileStoreTable(
+ fileIO, pathFactory().root(), schema,
catalogEnvironment),
+ commitUser));
+ }
+
+ return callbacks;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index e00b2a398c..ed899ba833 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -39,7 +39,6 @@ import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
@@ -156,12 +155,6 @@ public class PrivilegedFileStore<T> implements
FileStore<T> {
return wrapped.newCommit(commitUser);
}
- @Override
- public FileStoreCommit newCommit(String commitUser, List<CommitCallback>
callbacks) {
- privilegeChecker.assertCanInsert(identifier);
- return wrapped.newCommit(commitUser, callbacks);
- }
-
@Override
public SnapshotDeletion newSnapshotDeletion() {
privilegeChecker.assertCanInsert(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index ebeccd24ae..1771a5998f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -27,9 +27,6 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.metastore.AddPartitionCommitCallback;
-import org.apache.paimon.metastore.AddPartitionTagCallback;
-import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.ExpireConfig;
@@ -39,8 +36,6 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
-import org.apache.paimon.table.sink.CallbackUtils;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketWriteSelector;
@@ -58,12 +53,10 @@ import
org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
-import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.CatalogBranchManager;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileSystemBranchManager;
-import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SimpleFileReader;
@@ -78,7 +71,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -463,7 +455,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
return new TableCommitImpl(
- store().newCommit(commitUser,
createCommitCallbacks(commitUser)),
+ store().newCommit(commitUser),
snapshotExpire,
options.writeOnly() ? null :
store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
@@ -474,42 +466,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
options.forceCreatingSnapshot());
}
- protected List<CommitCallback> createCommitCallbacks(String commitUser) {
- List<CommitCallback> callbacks =
- new
ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions()));
- CoreOptions options = coreOptions();
-
- if (options.partitionedTableInMetastore() &&
!tableSchema.partitionKeys().isEmpty()) {
- PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
- if (partitionHandler != null) {
- InternalRowPartitionComputer partitionComputer =
- new InternalRowPartitionComputer(
- options.partitionDefaultName(),
- tableSchema.logicalPartitionType(),
- tableSchema.partitionKeys().toArray(new
String[0]),
- options.legacyPartitionName());
- callbacks.add(new AddPartitionCommitCallback(partitionHandler,
partitionComputer));
- }
- }
-
- TagPreview tagPreview = TagPreview.create(options);
- if (options.tagToPartitionField() != null
- && tagPreview != null
- && tableSchema.partitionKeys().isEmpty()) {
- PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
- if (partitionHandler != null) {
- TagPreviewCommitCallback callback =
- new TagPreviewCommitCallback(
- new AddPartitionTagCallback(
- partitionHandler,
options.tagToPartitionField()),
- tagPreview);
- callbacks.add(callback);
- }
- }
-
- return callbacks;
- }
-
private Optional<TableSchema> tryTimeTravel(Options options) {
CoreOptions coreOptions = new CoreOptions(options);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 103fa64050..3448a4ddd4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -23,8 +23,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback;
-import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -34,7 +32,6 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
@@ -47,11 +44,10 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
-import java.util.List;
import java.util.function.BiConsumer;
/** {@link FileStoreTable} for append table. */
-class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
+public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
private static final long serialVersionUID = 1L;
@@ -61,7 +57,7 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable
{
this(fileIO, path, tableSchema, CatalogEnvironment.empty());
}
- AppendOnlyFileStoreTable(
+ public AppendOnlyFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
@@ -160,17 +156,4 @@ class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
public LocalTableQuery newLocalTableQuery() {
throw new UnsupportedOperationException();
}
-
- @Override
- protected List<CommitCallback> createCommitCallbacks(String commitUser) {
- List<CommitCallback> callbacks =
super.createCommitCallbacks(commitUser);
- CoreOptions options = coreOptions();
-
- if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
- != IcebergOptions.StorageType.DISABLED) {
- callbacks.add(new AppendOnlyIcebergCommitCallback(this,
commitUser));
- }
-
- return callbacks;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index ea71204dc9..835405c513 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -23,8 +23,6 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.iceberg.IcebergOptions;
-import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
@@ -34,7 +32,6 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
@@ -50,7 +47,7 @@ import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMap
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
/** {@link FileStoreTable} for primary key table. */
-class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
+public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
private static final long serialVersionUID = 1L;
@@ -60,7 +57,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable
{
this(fileIO, path, tableSchema, CatalogEnvironment.empty());
}
- PrimaryKeyFileStoreTable(
+ public PrimaryKeyFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
@@ -176,17 +173,4 @@ class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
public LocalTableQuery newLocalTableQuery() {
return new LocalTableQuery(this);
}
-
- @Override
- protected List<CommitCallback> createCommitCallbacks(String commitUser) {
- List<CommitCallback> callbacks =
super.createCommitCallbacks(commitUser);
- CoreOptions options = coreOptions();
-
- if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
- != IcebergOptions.StorageType.DISABLED) {
- callbacks.add(new PrimaryKeyIcebergCommitCallback(this,
commitUser));
- }
-
- return callbacks;
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 934f48e067..8cd37bdd1e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -191,6 +191,70 @@ public class IcebergCompatibilityTest {
commit.close();
}
+ @Test
+ public void testDropPartition() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(), DataTypes.INT(),
DataTypes.INT(), DataTypes.INT()
+ },
+ new String[] {"pt1", "pt2", "k", "v"});
+
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Arrays.asList("pt1", "pt2"),
+ Arrays.asList("pt1", "pt2", "k"),
+ 1,
+ Collections.emptyMap());
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(BinaryString.fromString("20250304"), 15, 1,
1));
+ write.write(GenericRow.of(BinaryString.fromString("20250304"), 16, 1,
1));
+ write.write(GenericRow.of(BinaryString.fromString("20250305"), 15, 1,
1));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(20250304, 15, 1, 1)",
+ "Record(20250304, 16, 1, 1)",
+ "Record(20250305, 15, 1, 1)");
+
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path path = new Path(tempDir.toString());
+
+ try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO,
path)) {
+ Identifier paimonIdentifier = Identifier.create("mydb", "t");
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ // delete the second-level partition
+ Map<String, String> partition = new HashMap<>();
+ partition.put("pt1", "20250304");
+ partition.put("pt2", "16");
+ paimonCatalog.dropPartitions(
+ paimonIdentifier,
Collections.singletonList(partition));
+
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(20250304, 15, 1, 1)",
"Record(20250305, 15, 1, 1)");
+ } else {
+ // delete the first-level partition
+ Map<String, String> partition = new HashMap<>();
+ partition.put("pt1", "20250304");
+ paimonCatalog.dropPartitions(
+ paimonIdentifier,
Collections.singletonList(partition));
+
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder("Record(20250305, 15, 1,
1)");
+ }
+ }
+
+ write.close();
+ commit.close();
+ }
+
@Test
public void testRetryCreateMetadata() throws Exception {
RowType rowType =
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
index c0c68888b8..baadf9c3de 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java
@@ -106,6 +106,12 @@ public abstract class
IcebergHiveMetadataCommitterITCaseBase {
Row.of(2, 2, "elephant")),
collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t
ORDER BY pt, id")));
+ // test drop partition
+ tEnv.executeSql("ALTER TABLE my_paimon.test_db.t DROP PARTITION (pt =
1)").await();
+ Assert.assertEquals(
+ Arrays.asList(Row.of(2, 1, "cat"), Row.of(2, 2, "elephant")),
+ collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t
ORDER BY pt, id")));
+
Assert.assertTrue(
hiveShell
.executeQuery("DESC DATABASE EXTENDED test_db")
@@ -196,6 +202,14 @@ public abstract class
IcebergHiveMetadataCommitterITCaseBase {
tEnv.executeSql(
"SELECT data, id, pt FROM my_iceberg.test_db.t
WHERE id > 1 ORDER BY pt, id")));
+ // test drop partition
+ tEnv.executeSql("ALTER TABLE my_paimon.test_db.t DROP PARTITION (pt =
2)").await();
+ Assert.assertEquals(
+ Arrays.asList(Row.of("pear", 2, 1), Row.of("cherry", 3, 1)),
+ collect(
+ tEnv.executeSql(
+ "SELECT data, id, pt FROM my_iceberg.test_db.t
WHERE id > 1 ORDER BY pt, id")));
+
// specify a dedicated hive database and table for paimon iceberg
commiter
tEnv.executeSql(
"CREATE TABLE my_paimon.test_db.t1 ( pt INT, id INT, data
STRING ) PARTITIONED BY (pt) WITH "