This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 81e60e3dd [core] BatchWriteBuilder#withOverwrite should not accept 
multiple partitions list (#766)
81e60e3dd is described below

commit 81e60e3dd011861449960afb37ae6f07b8941838
Author: yuzelin <[email protected]>
AuthorDate: Thu Mar 30 14:17:04 2023 +0800

    [core] BatchWriteBuilder#withOverwrite should not accept multiple 
partitions list (#766)
---
 paimon-core/pom.xml                                |  7 ++
 .../apache/paimon/operation/FileStoreCommit.java   | 32 +++----
 .../paimon/operation/FileStoreCommitImpl.java      | 68 ++++++++-------
 .../apache/paimon/operation/PartitionExpire.java   |  6 +-
 .../paimon/table/sink/BatchWriteBuilder.java       | 11 +--
 .../paimon/table/sink/BatchWriteBuilderImpl.java   | 11 ++-
 .../apache/paimon/table/sink/InnerTableCommit.java | 13 +--
 .../apache/paimon/table/sink/TableCommitImpl.java  | 14 ++--
 .../test/java/org/apache/paimon/TestFileStore.java | 21 ++++-
 .../paimon/operation/FileStoreCommitTest.java      | 98 +++++++++++++++++++++-
 .../paimon/flink/action/DropPartitionAction.java   | 22 +++--
 11 files changed, 211 insertions(+), 92 deletions(-)

diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 8a7f63df8..b21cb2fe3 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -145,6 +145,13 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 41e69a94b..2c114b830 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -18,9 +18,9 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.manifest.ManifestCommittable;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,24 +52,26 @@ public interface FileStoreCommit {
     /** Commit from manifest committable. */
     void commit(ManifestCommittable committable, Map<String, String> 
properties);
 
-    /** Overwrite a single partition from manifest committable. */
-    default void overwrite(
-            Map<String, String> partition,
-            ManifestCommittable committable,
-            Map<String, String> properties) {
-        overwrite(Collections.singletonList(partition), committable, 
properties);
-    }
-
     /**
-     * Overwrite multiple partitions from manifest committable.
+     * Overwrite from manifest committable and partition.
+     *
+     * <p>TODO: The method's semantics can be dynamic or static overwrite 
according to properties.
      *
-     * @param partitions A list of partition {@link Map}s that maps each 
partition key to a
-     *     partition value. Depending on the user-defined statement, the 
partition might not include
-     *     all partition keys. Also note that this partition does not 
necessarily equal to the
-     *     partitions of the newly added key-values. This is just the 
partition to be cleaned up.
+     * @param partition A single partition maps each partition key to a 
partition value. Depending
+     *     on the user-defined statement, the partition might not include all 
partition keys. Also
+     *     note that this partition does not necessarily equal to the 
partitions of the newly added
+     *     key-values. This is just the partition to be cleaned up.
      */
     void overwrite(
-            List<Map<String, String>> partitions,
+            Map<String, String> partition,
             ManifestCommittable committable,
             Map<String, String> properties);
+
+    /**
+     * Drop multiple partitions. The {@link Snapshot.CommitKind} of generated 
snapshot is {@link
+     * Snapshot.CommitKind#OVERWRITE}.
+     *
+     * @param partitions A list of partition {@link Map}s. NOTE: cannot be 
empty!
+     */
+    void dropPartitions(List<Map<String, String>> partitions);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 685fe6d00..a7f7d7e38 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -34,10 +34,12 @@ import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -246,15 +248,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
     @Override
     public void overwrite(
-            List<Map<String, String>> partitions,
+            Map<String, String> partition,
             ManifestCommittable committable,
             Map<String, String> properties) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Ready to overwrite partition "
-                            + partitions.stream()
-                                    .map(Object::toString)
-                                    .collect(Collectors.joining(","))
+                            + partition.toString()
                             + "\n"
                             + committable.toString());
         }
@@ -287,35 +287,20 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         // sanity check, all changes must be done within the given partition
-        List<Predicate> partitionFilters = new ArrayList<>();
-        for (Map<String, String> partition : partitions) {
-            Predicate partitionFilter = PredicateBuilder.partition(partition, 
partitionType);
-            if (partitionFilter != null) {
-                for (ManifestEntry entry : appendTableFiles) {
-                    if (!partitionFilter.test(
-                            
partitionObjectConverter.convert(entry.partition()))) {
-                        throw new IllegalArgumentException(
-                                "Trying to overwrite partition "
-                                        + partition
-                                        + ", but the changes in "
-                                        + 
pathFactory.getPartitionString(entry.partition())
-                                        + " does not belong to this 
partition");
-                    }
+        Predicate partitionFilter = PredicateBuilder.partition(partition, 
partitionType);
+        if (partitionFilter != null) {
+            for (ManifestEntry entry : appendTableFiles) {
+                if 
(!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
+                    throw new IllegalArgumentException(
+                            "Trying to overwrite partition "
+                                    + partition
+                                    + ", but the changes in "
+                                    + 
pathFactory.getPartitionString(entry.partition())
+                                    + " does not belong to this partition");
                 }
-
-                partitionFilters.add(partitionFilter);
             }
         }
 
-        Predicate partitionFilter;
-        if (partitionFilters.size() == 0) {
-            partitionFilter = null;
-        } else if (partitionFilters.size() == 1) {
-            partitionFilter = partitionFilters.get(0);
-        } else {
-            partitionFilter = PredicateBuilder.or(partitionFilters);
-        }
-
         // overwrite new files
         tryOverwrite(
                 partitionFilter,
@@ -336,6 +321,31 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
+    @Override
+    public void dropPartitions(List<Map<String, String>> partitions) {
+        Preconditions.checkArgument(!partitions.isEmpty(), "Partitions list 
cannot be empty.");
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Ready to drop partitions {}",
+                    
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
+        }
+
+        Predicate partitionFilter =
+                partitions.stream()
+                        .map(partition -> 
PredicateBuilder.partition(partition, partitionType))
+                        .reduce(PredicateBuilder::or)
+                        .orElseThrow(() -> new RuntimeException("Failed to get 
partition filter."));
+
+        tryOverwrite(
+                partitionFilter,
+                Collections.emptyList(),
+                // identifier is MAX_VALUE to avoid conflict
+                BatchWriteBuilder.COMMIT_IDENTIFIER,
+                null,
+                Collections.emptyMap());
+    }
+
     private void collectChanges(
             List<CommitMessage> commitMessages,
             List<ManifestEntry> appendTableFiles,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 74c5eca48..f7687df28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -20,7 +20,6 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.types.RowType;
@@ -30,7 +29,6 @@ import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,9 +99,7 @@ public class PartitionExpire {
         }
 
         if (expired.size() > 0) {
-            // identifier is MAX_VALUE to avoid conflict.
-            ManifestCommittable committable = new 
ManifestCommittable(Long.MAX_VALUE);
-            commit.overwrite(expired, committable, Collections.emptyMap());
+            commit.dropPartitions(expired);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
index abcab70c9..fbfcd250f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.InternalRow;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -63,15 +62,7 @@ public interface BatchWriteBuilder extends WriteBuilder {
     }
 
     /** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' 
semantics of SQL. */
-    default BatchWriteBuilder withOverwrite(@Nullable Map<String, String> 
staticPartition) {
-        if (staticPartition != null) {
-            withOverwrite(Collections.singletonList(staticPartition));
-        }
-        return this;
-    }
-
-    /** Overwrite writing, multiple static partitions can be specified. */
-    BatchWriteBuilder withOverwrite(@Nullable List<Map<String, String>> 
staticPartitions);
+    BatchWriteBuilder withOverwrite(@Nullable Map<String, String> 
staticPartition);
 
     /** Create a {@link TableWrite} to write {@link InternalRow}s. */
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index a1342e652..407390f4c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -23,7 +23,6 @@ import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -35,7 +34,7 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
     private final InnerTable table;
     private final String commitUser = UUID.randomUUID().toString();
 
-    private List<Map<String, String>> staticPartitions;
+    private Map<String, String> staticPartition;
 
     public BatchWriteBuilderImpl(InnerTable table) {
         this.table = table;
@@ -52,19 +51,19 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
     }
 
     @Override
-    public BatchWriteBuilder withOverwrite(@Nullable List<Map<String, String>> 
staticPartitions) {
-        this.staticPartitions = staticPartitions;
+    public BatchWriteBuilder withOverwrite(@Nullable Map<String, String> 
staticPartition) {
+        this.staticPartition = staticPartition;
         return this;
     }
 
     @Override
     public BatchTableWrite newWrite() {
-        return table.newWrite(commitUser).withOverwrite(staticPartitions != 
null);
+        return table.newWrite(commitUser).withOverwrite(staticPartition != 
null);
     }
 
     @Override
     public BatchTableCommit newCommit() {
-        InnerTableCommit commit = 
table.newCommit(commitUser).withOverwrite(staticPartitions);
+        InnerTableCommit commit = 
table.newCommit(commitUser).withOverwrite(staticPartition);
         commit.ignoreEmptyCommit(true);
         return commit;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 2f4e5430c..f5bbcabcc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -22,26 +22,17 @@ import org.apache.paimon.operation.Lock;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 /** Inner {@link TableCommit} contains overwrite setter. */
 public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit {
 
     /** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' 
semantics of SQL. */
-    default InnerTableCommit withOverwrite(@Nullable Map<String, String> 
staticPartition) {
-        if (staticPartition != null) {
-            withOverwrite(Collections.singletonList(staticPartition));
-        }
-        return this;
-    }
-
-    InnerTableCommit withOverwrite(@Nullable List<Map<String, String>> 
overwritePartitions);
+    InnerTableCommit withOverwrite(@Nullable Map<String, String> 
staticPartition);
 
     /**
      * If this is set to true, when there is no new data, no snapshot will be 
generated. By default,
-     * empty commit is be ignored.
+     * empty commit is ignored.
      *
      * <ul>
      *   <li>For Streaming: the default value of 'ignoreEmptyCommit' is false.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 190e5f863..0236e27ad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -44,7 +44,7 @@ public class TableCommitImpl implements InnerTableCommit {
     @Nullable private final FileStoreExpire expire;
     @Nullable private final PartitionExpire partitionExpire;
 
-    @Nullable private List<Map<String, String>> overwritePartitions = null;
+    @Nullable private Map<String, String> overwritePartition = null;
     @Nullable private Lock lock;
 
     private boolean batchCommitted = false;
@@ -59,8 +59,8 @@ public class TableCommitImpl implements InnerTableCommit {
     }
 
     @Override
-    public TableCommitImpl withOverwrite(@Nullable List<Map<String, String>> 
overwritePartitions) {
-        this.overwritePartitions = overwritePartitions;
+    public TableCommitImpl withOverwrite(@Nullable Map<String, String> 
overwritePartitions) {
+        this.overwritePartition = overwritePartitions;
         return this;
     }
 
@@ -97,10 +97,10 @@ public class TableCommitImpl implements InnerTableCommit {
         for (CommitMessage commitMessage : commitMessages) {
             committable.addFileCommittable(commitMessage);
         }
-        if (overwritePartitions == null) {
+        if (overwritePartition == null) {
             commit.commit(committable, new HashMap<>());
         } else {
-            commit.overwrite(overwritePartitions, committable, new 
HashMap<>());
+            commit.overwrite(overwritePartition, committable, new HashMap<>());
         }
         expire();
     }
@@ -110,7 +110,7 @@ public class TableCommitImpl implements InnerTableCommit {
     }
 
     public void commitMultiple(List<ManifestCommittable> committables) {
-        if (overwritePartitions == null) {
+        if (overwritePartition == null) {
             for (ManifestCommittable committable : committables) {
                 commit.commit(committable, new HashMap<>());
             }
@@ -128,7 +128,7 @@ public class TableCommitImpl implements InnerTableCommit {
                 // TODO maybe it can be produced by CommitterOperator
                 committable = new ManifestCommittable(Long.MAX_VALUE);
             }
-            commit.overwrite(overwritePartitions, committable, new 
HashMap<>());
+            commit.overwrite(overwritePartition, committable, new HashMap<>());
         }
 
         expire();
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 78d0044bb..b6750d5d0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -152,9 +152,7 @@ public class TestFileStore extends KeyValueFileStore {
                 false,
                 null,
                 watermark,
-                (commit, committable) -> {
-                    commit.commit(committable, Collections.emptyMap());
-                });
+                (commit, committable) -> commit.commit(committable, 
Collections.emptyMap()));
     }
 
     public List<Snapshot> commitData(
@@ -193,6 +191,23 @@ public class TestFileStore extends KeyValueFileStore {
                         commit.overwrite(partition, committable, 
Collections.emptyMap()));
     }
 
+    public Snapshot dropPartitions(List<Map<String, String>> partitions) {
+        FileStoreCommit commit = newCommit(commitUser);
+
+        SnapshotManager snapshotManager = snapshotManager();
+        Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
+        if (snapshotIdBeforeCommit == null) {
+            snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
+        }
+        commit.dropPartitions(partitions);
+
+        Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
+        assertThat(snapshotIdAfterCommit).isNotNull();
+        assertThat(snapshotIdBeforeCommit + 
1).isEqualTo(snapshotIdAfterCommit);
+
+        return snapshotManager.snapshot(snapshotIdAfterCommit);
+    }
+
     public List<Snapshot> commitDataImpl(
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRow> partitionCalculator,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 5664fb7cc..fbcb4c054 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -28,11 +28,14 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -52,6 +55,7 @@ import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
@@ -59,6 +63,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Tests for {@link FileStoreCommitImpl}. */
@@ -79,7 +84,7 @@ public class FileStoreCommitTest {
     }
 
     @AfterEach
-    public void afterEach() throws Exception {
+    public void afterEach() {
         Predicate<Path> pathPredicate = path -> 
path.toString().contains(tempDir.toString());
         assertThat(FailingFileIO.openInputStreams(pathPredicate)).isEmpty();
         assertThat(FailingFileIO.openOutputStreams(pathPredicate)).isEmpty();
@@ -583,6 +588,97 @@ public class FileStoreCommitTest {
         assertThat(snapshot.watermark()).isEqualTo(2048);
     }
 
+    @Test
+    public void testDropPartitions() throws Exception {
+        // generate and commit initial data
+        Map<BinaryRow, List<KeyValue>> data =
+                generateData(ThreadLocalRandom.current().nextInt(50, 1000));
+        logData(
+                () ->
+                        data.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                "data");
+
+        TestFileStore store = createStore(false);
+        store.commitData(
+                
data.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                gen::getPartition,
+                kv -> 0);
+
+        // generate partitions to be dropped
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int partitionsToDrop = random.nextInt(data.size()) + 1;
+        boolean specifyHr = random.nextBoolean();
+        int index = random.nextInt(data.size() - partitionsToDrop + 1);
+        List<Map<String, String>> partitions = new ArrayList<>();
+        for (int i = 0; i < partitionsToDrop; i++) {
+            Map<String, String> partition = new HashMap<>();
+            // partition 'dt'
+            partition.put("dt", new 
ArrayList<>(data.keySet()).get(index).getString(0).toString());
+            // partition 'hr'
+            if (specifyHr && random.nextBoolean()) {
+                partition.put(
+                        "hr", String.valueOf(new 
ArrayList<>(data.keySet()).get(index).getInt(1)));
+            }
+            index++;
+
+            partitions.add(partition);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "partitionsToDrop "
+                            + partitions.stream()
+                                    .map(Objects::toString)
+                                    .collect(Collectors.joining(",")));
+        }
+
+        Snapshot snapshot = store.dropPartitions(partitions);
+
+        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+        // check data
+        RowDataToObjectArrayConverter partitionConverter =
+                new 
RowDataToObjectArrayConverter(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+        org.apache.paimon.predicate.Predicate partitionFilter =
+                partitions.stream()
+                        .map(
+                                partition ->
+                                        PredicateBuilder.partition(
+                                                partition, 
TestKeyValueGenerator.DEFAULT_PART_TYPE))
+                        .reduce(PredicateBuilder::or)
+                        .get();
+
+        List<KeyValue> expectedKvs = new ArrayList<>();
+        for (Map.Entry<BinaryRow, List<KeyValue>> entry : data.entrySet()) {
+            if 
(partitionFilter.test(partitionConverter.convert(entry.getKey()))) {
+                continue;
+            }
+            expectedKvs.addAll(entry.getValue());
+        }
+        gen.sort(expectedKvs);
+        Map<BinaryRow, BinaryRow> expected = store.toKvMap(expectedKvs);
+
+        List<KeyValue> actualKvs =
+                
store.readKvsFromSnapshot(store.snapshotManager().latestSnapshotId());
+        gen.sort(actualKvs);
+        Map<BinaryRow, BinaryRow> actual = store.toKvMap(actualKvs);
+
+        logData(() -> kvMapToKvList(expected), "expected");
+        logData(() -> kvMapToKvList(actual), "actual");
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testDropEmptyPartition() throws Exception {
+        TestFileStore store = createStore(false);
+        assertThatThrownBy(() -> store.dropPartitions(Collections.emptyList()))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Partitions list cannot be empty."));
+    }
+
     private TestFileStore createStore(boolean failing) throws Exception {
         return createStore(failing, 1);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index fb6cd33db..543d92a46 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -18,17 +18,19 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static org.apache.paimon.flink.action.Action.getPartitions;
 import static org.apache.paimon.flink.action.Action.getTablePath;
@@ -38,7 +40,8 @@ public class DropPartitionAction extends ActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DropPartitionAction.class);
 
-    private final BatchTableCommit commit;
+    private final List<Map<String, String>> partitions;
+    private final FileStoreCommit commit;
 
     DropPartitionAction(
             String warehouse,
@@ -46,8 +49,17 @@ public class DropPartitionAction extends ActionBase {
             String tableName,
             List<Map<String, String>> partitions) {
         super(warehouse, databaseName, tableName);
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports drop-partition 
action. The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+
+        this.partitions = partitions;
 
-        this.commit = 
table.newBatchWriteBuilder().withOverwrite(partitions).newCommit();
+        AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
+        this.commit = 
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
     }
 
     public static Optional<Action> create(String[] args) {
@@ -112,6 +124,6 @@ public class DropPartitionAction extends ActionBase {
 
     @Override
     public void run() throws Exception {
-        this.commit.commit(Collections.emptyList());
+        commit.dropPartitions(partitions);
     }
 }

Reply via email to