This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 81e60e3dd [core] BatchWriteBuilder#withOverwrite should not accept
multiple partitions list (#766)
81e60e3dd is described below
commit 81e60e3dd011861449960afb37ae6f07b8941838
Author: yuzelin <[email protected]>
AuthorDate: Thu Mar 30 14:17:04 2023 +0800
[core] BatchWriteBuilder#withOverwrite should not accept multiple
partitions list (#766)
---
paimon-core/pom.xml | 7 ++
.../apache/paimon/operation/FileStoreCommit.java | 32 +++----
.../paimon/operation/FileStoreCommitImpl.java | 68 ++++++++-------
.../apache/paimon/operation/PartitionExpire.java | 6 +-
.../paimon/table/sink/BatchWriteBuilder.java | 11 +--
.../paimon/table/sink/BatchWriteBuilderImpl.java | 11 ++-
.../apache/paimon/table/sink/InnerTableCommit.java | 13 +--
.../apache/paimon/table/sink/TableCommitImpl.java | 14 ++--
.../test/java/org/apache/paimon/TestFileStore.java | 21 ++++-
.../paimon/operation/FileStoreCommitTest.java | 98 +++++++++++++++++++++-
.../paimon/flink/action/DropPartitionAction.java | 22 +++--
11 files changed, 211 insertions(+), 92 deletions(-)
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 8a7f63df8..b21cb2fe3 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -145,6 +145,13 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 41e69a94b..2c114b830 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -18,9 +18,9 @@
package org.apache.paimon.operation;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestCommittable;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -52,24 +52,26 @@ public interface FileStoreCommit {
/** Commit from manifest committable. */
void commit(ManifestCommittable committable, Map<String, String>
properties);
- /** Overwrite a single partition from manifest committable. */
- default void overwrite(
- Map<String, String> partition,
- ManifestCommittable committable,
- Map<String, String> properties) {
- overwrite(Collections.singletonList(partition), committable,
properties);
- }
-
/**
- * Overwrite multiple partitions from manifest committable.
+ * Overwrite from manifest committable and partition.
+ *
+ * <p>TODO: The method's semantics can be dynamic or static overwrite
according to properties.
*
- * @param partitions A list of partition {@link Map}s that maps each
partition key to a
- * partition value. Depending on the user-defined statement, the
partition might not include
- * all partition keys. Also note that this partition does not
necessarily equal to the
- * partitions of the newly added key-values. This is just the
partition to be cleaned up.
+ * @param partition A single partition maps each partition key to a
partition value. Depending
+ * on the user-defined statement, the partition might not include all
partition keys. Also
+ * note that this partition does not necessarily equal to the
partitions of the newly added
+ * key-values. This is just the partition to be cleaned up.
*/
void overwrite(
- List<Map<String, String>> partitions,
+ Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties);
+
+ /**
+ * Drop multiple partitions. The {@link Snapshot.CommitKind} of generated
snapshot is {@link
+ * Snapshot.CommitKind#OVERWRITE}.
+ *
+ * @param partitions A list of partition {@link Map}s. NOTE: cannot be
empty!
+ */
+ void dropPartitions(List<Map<String, String>> partitions);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 685fe6d00..a7f7d7e38 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -34,10 +34,12 @@ import org.apache.paimon.options.MemorySize;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
@@ -246,15 +248,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Override
public void overwrite(
- List<Map<String, String>> partitions,
+ Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ready to overwrite partition "
- + partitions.stream()
- .map(Object::toString)
- .collect(Collectors.joining(","))
+ + partition.toString()
+ "\n"
+ committable.toString());
}
@@ -287,35 +287,20 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
// sanity check, all changes must be done within the given partition
- List<Predicate> partitionFilters = new ArrayList<>();
- for (Map<String, String> partition : partitions) {
- Predicate partitionFilter = PredicateBuilder.partition(partition,
partitionType);
- if (partitionFilter != null) {
- for (ManifestEntry entry : appendTableFiles) {
- if (!partitionFilter.test(
-
partitionObjectConverter.convert(entry.partition()))) {
- throw new IllegalArgumentException(
- "Trying to overwrite partition "
- + partition
- + ", but the changes in "
- +
pathFactory.getPartitionString(entry.partition())
- + " does not belong to this
partition");
- }
+ Predicate partitionFilter = PredicateBuilder.partition(partition,
partitionType);
+ if (partitionFilter != null) {
+ for (ManifestEntry entry : appendTableFiles) {
+ if
(!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
+ throw new IllegalArgumentException(
+ "Trying to overwrite partition "
+ + partition
+ + ", but the changes in "
+ +
pathFactory.getPartitionString(entry.partition())
+ + " does not belong to this partition");
}
-
- partitionFilters.add(partitionFilter);
}
}
- Predicate partitionFilter;
- if (partitionFilters.size() == 0) {
- partitionFilter = null;
- } else if (partitionFilters.size() == 1) {
- partitionFilter = partitionFilters.get(0);
- } else {
- partitionFilter = PredicateBuilder.or(partitionFilters);
- }
-
// overwrite new files
tryOverwrite(
partitionFilter,
@@ -336,6 +321,31 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
+ @Override
+ public void dropPartitions(List<Map<String, String>> partitions) {
+ Preconditions.checkArgument(!partitions.isEmpty(), "Partitions list
cannot be empty.");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Ready to drop partitions {}",
+
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
+ }
+
+ Predicate partitionFilter =
+ partitions.stream()
+ .map(partition ->
PredicateBuilder.partition(partition, partitionType))
+ .reduce(PredicateBuilder::or)
+ .orElseThrow(() -> new RuntimeException("Failed to get
partition filter."));
+
+ tryOverwrite(
+ partitionFilter,
+ Collections.emptyList(),
+ // identifier is MAX_VALUE to avoid conflict
+ BatchWriteBuilder.COMMIT_IDENTIFIER,
+ null,
+ Collections.emptyMap());
+ }
+
private void collectChanges(
List<CommitMessage> commitMessages,
List<ManifestEntry> appendTableFiles,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 74c5eca48..f7687df28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -20,7 +20,6 @@ package org.apache.paimon.operation;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.types.RowType;
@@ -30,7 +29,6 @@ import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -101,9 +99,7 @@ public class PartitionExpire {
}
if (expired.size() > 0) {
- // identifier is MAX_VALUE to avoid conflict.
- ManifestCommittable committable = new
ManifestCommittable(Long.MAX_VALUE);
- commit.overwrite(expired, committable, Collections.emptyMap());
+ commit.dropPartitions(expired);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
index abcab70c9..fbfcd250f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.InternalRow;
import javax.annotation.Nullable;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
/**
@@ -63,15 +62,7 @@ public interface BatchWriteBuilder extends WriteBuilder {
}
/** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)'
semantics of SQL. */
- default BatchWriteBuilder withOverwrite(@Nullable Map<String, String>
staticPartition) {
- if (staticPartition != null) {
- withOverwrite(Collections.singletonList(staticPartition));
- }
- return this;
- }
-
- /** Overwrite writing, multiple static partitions can be specified. */
- BatchWriteBuilder withOverwrite(@Nullable List<Map<String, String>>
staticPartitions);
+ BatchWriteBuilder withOverwrite(@Nullable Map<String, String>
staticPartition);
/** Create a {@link TableWrite} to write {@link InternalRow}s. */
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index a1342e652..407390f4c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -23,7 +23,6 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -35,7 +34,7 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
private final InnerTable table;
private final String commitUser = UUID.randomUUID().toString();
- private List<Map<String, String>> staticPartitions;
+ private Map<String, String> staticPartition;
public BatchWriteBuilderImpl(InnerTable table) {
this.table = table;
@@ -52,19 +51,19 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
}
@Override
- public BatchWriteBuilder withOverwrite(@Nullable List<Map<String, String>>
staticPartitions) {
- this.staticPartitions = staticPartitions;
+ public BatchWriteBuilder withOverwrite(@Nullable Map<String, String>
staticPartition) {
+ this.staticPartition = staticPartition;
return this;
}
@Override
public BatchTableWrite newWrite() {
- return table.newWrite(commitUser).withOverwrite(staticPartitions !=
null);
+ return table.newWrite(commitUser).withOverwrite(staticPartition !=
null);
}
@Override
public BatchTableCommit newCommit() {
- InnerTableCommit commit =
table.newCommit(commitUser).withOverwrite(staticPartitions);
+ InnerTableCommit commit =
table.newCommit(commitUser).withOverwrite(staticPartition);
commit.ignoreEmptyCommit(true);
return commit;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 2f4e5430c..f5bbcabcc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -22,26 +22,17 @@ import org.apache.paimon.operation.Lock;
import javax.annotation.Nullable;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
/** Inner {@link TableCommit} contains overwrite setter. */
public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit {
/** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)'
semantics of SQL. */
- default InnerTableCommit withOverwrite(@Nullable Map<String, String>
staticPartition) {
- if (staticPartition != null) {
- withOverwrite(Collections.singletonList(staticPartition));
- }
- return this;
- }
-
- InnerTableCommit withOverwrite(@Nullable List<Map<String, String>>
overwritePartitions);
+ InnerTableCommit withOverwrite(@Nullable Map<String, String>
staticPartition);
/**
* If this is set to true, when there is no new data, no snapshot will be
generated. By default,
- * empty commit is be ignored.
+ * empty commit is ignored.
*
* <ul>
* <li>For Streaming: the default value of 'ignoreEmptyCommit' is false.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 190e5f863..0236e27ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -44,7 +44,7 @@ public class TableCommitImpl implements InnerTableCommit {
@Nullable private final FileStoreExpire expire;
@Nullable private final PartitionExpire partitionExpire;
- @Nullable private List<Map<String, String>> overwritePartitions = null;
+ @Nullable private Map<String, String> overwritePartition = null;
@Nullable private Lock lock;
private boolean batchCommitted = false;
@@ -59,8 +59,8 @@ public class TableCommitImpl implements InnerTableCommit {
}
@Override
- public TableCommitImpl withOverwrite(@Nullable List<Map<String, String>>
overwritePartitions) {
- this.overwritePartitions = overwritePartitions;
+ public TableCommitImpl withOverwrite(@Nullable Map<String, String>
overwritePartitions) {
+ this.overwritePartition = overwritePartitions;
return this;
}
@@ -97,10 +97,10 @@ public class TableCommitImpl implements InnerTableCommit {
for (CommitMessage commitMessage : commitMessages) {
committable.addFileCommittable(commitMessage);
}
- if (overwritePartitions == null) {
+ if (overwritePartition == null) {
commit.commit(committable, new HashMap<>());
} else {
- commit.overwrite(overwritePartitions, committable, new
HashMap<>());
+ commit.overwrite(overwritePartition, committable, new HashMap<>());
}
expire();
}
@@ -110,7 +110,7 @@ public class TableCommitImpl implements InnerTableCommit {
}
public void commitMultiple(List<ManifestCommittable> committables) {
- if (overwritePartitions == null) {
+ if (overwritePartition == null) {
for (ManifestCommittable committable : committables) {
commit.commit(committable, new HashMap<>());
}
@@ -128,7 +128,7 @@ public class TableCommitImpl implements InnerTableCommit {
// TODO maybe it can be produced by CommitterOperator
committable = new ManifestCommittable(Long.MAX_VALUE);
}
- commit.overwrite(overwritePartitions, committable, new
HashMap<>());
+ commit.overwrite(overwritePartition, committable, new HashMap<>());
}
expire();
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 78d0044bb..b6750d5d0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -152,9 +152,7 @@ public class TestFileStore extends KeyValueFileStore {
false,
null,
watermark,
- (commit, committable) -> {
- commit.commit(committable, Collections.emptyMap());
- });
+ (commit, committable) -> commit.commit(committable,
Collections.emptyMap()));
}
public List<Snapshot> commitData(
@@ -193,6 +191,23 @@ public class TestFileStore extends KeyValueFileStore {
commit.overwrite(partition, committable,
Collections.emptyMap()));
}
+ public Snapshot dropPartitions(List<Map<String, String>> partitions) {
+ FileStoreCommit commit = newCommit(commitUser);
+
+ SnapshotManager snapshotManager = snapshotManager();
+ Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
+ if (snapshotIdBeforeCommit == null) {
+ snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
+ }
+ commit.dropPartitions(partitions);
+
+ Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
+ assertThat(snapshotIdAfterCommit).isNotNull();
+ assertThat(snapshotIdBeforeCommit +
1).isEqualTo(snapshotIdAfterCommit);
+
+ return snapshotManager.snapshot(snapshotIdAfterCommit);
+ }
+
public List<Snapshot> commitDataImpl(
List<KeyValue> kvs,
Function<KeyValue, BinaryRow> partitionCalculator,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 5664fb7cc..fbcb4c054 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -28,11 +28,14 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;
@@ -52,6 +55,7 @@ import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
@@ -59,6 +63,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;
/** Tests for {@link FileStoreCommitImpl}. */
@@ -79,7 +84,7 @@ public class FileStoreCommitTest {
}
@AfterEach
- public void afterEach() throws Exception {
+ public void afterEach() {
Predicate<Path> pathPredicate = path ->
path.toString().contains(tempDir.toString());
assertThat(FailingFileIO.openInputStreams(pathPredicate)).isEmpty();
assertThat(FailingFileIO.openOutputStreams(pathPredicate)).isEmpty();
@@ -583,6 +588,97 @@ public class FileStoreCommitTest {
assertThat(snapshot.watermark()).isEqualTo(2048);
}
+ @Test
+ public void testDropPartitions() throws Exception {
+ // generate and commit initial data
+ Map<BinaryRow, List<KeyValue>> data =
+ generateData(ThreadLocalRandom.current().nextInt(50, 1000));
+ logData(
+ () ->
+ data.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ "data");
+
+ TestFileStore store = createStore(false);
+ store.commitData(
+
data.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+ gen::getPartition,
+ kv -> 0);
+
+ // generate partitions to be dropped
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int partitionsToDrop = random.nextInt(data.size()) + 1;
+ boolean specifyHr = random.nextBoolean();
+ int index = random.nextInt(data.size() - partitionsToDrop + 1);
+ List<Map<String, String>> partitions = new ArrayList<>();
+ for (int i = 0; i < partitionsToDrop; i++) {
+ Map<String, String> partition = new HashMap<>();
+ // partition 'dt'
+ partition.put("dt", new
ArrayList<>(data.keySet()).get(index).getString(0).toString());
+ // partition 'hr'
+ if (specifyHr && random.nextBoolean()) {
+ partition.put(
+ "hr", String.valueOf(new
ArrayList<>(data.keySet()).get(index).getInt(1)));
+ }
+ index++;
+
+ partitions.add(partition);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "partitionsToDrop "
+ + partitions.stream()
+ .map(Objects::toString)
+ .collect(Collectors.joining(",")));
+ }
+
+ Snapshot snapshot = store.dropPartitions(partitions);
+
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+ // check data
+ RowDataToObjectArrayConverter partitionConverter =
+ new
RowDataToObjectArrayConverter(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+ org.apache.paimon.predicate.Predicate partitionFilter =
+ partitions.stream()
+ .map(
+ partition ->
+ PredicateBuilder.partition(
+ partition,
TestKeyValueGenerator.DEFAULT_PART_TYPE))
+ .reduce(PredicateBuilder::or)
+ .get();
+
+ List<KeyValue> expectedKvs = new ArrayList<>();
+ for (Map.Entry<BinaryRow, List<KeyValue>> entry : data.entrySet()) {
+ if
(partitionFilter.test(partitionConverter.convert(entry.getKey()))) {
+ continue;
+ }
+ expectedKvs.addAll(entry.getValue());
+ }
+ gen.sort(expectedKvs);
+ Map<BinaryRow, BinaryRow> expected = store.toKvMap(expectedKvs);
+
+ List<KeyValue> actualKvs =
+
store.readKvsFromSnapshot(store.snapshotManager().latestSnapshotId());
+ gen.sort(actualKvs);
+ Map<BinaryRow, BinaryRow> actual = store.toKvMap(actualKvs);
+
+ logData(() -> kvMapToKvList(expected), "expected");
+ logData(() -> kvMapToKvList(actual), "actual");
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testDropEmptyPartition() throws Exception {
+ TestFileStore store = createStore(false);
+ assertThatThrownBy(() -> store.dropPartitions(Collections.emptyList()))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ IllegalArgumentException.class,
+ "Partitions list cannot be empty."));
+ }
+
private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index fb6cd33db..543d92a46 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -18,17 +18,19 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import static org.apache.paimon.flink.action.Action.getPartitions;
import static org.apache.paimon.flink.action.Action.getTablePath;
@@ -38,7 +40,8 @@ public class DropPartitionAction extends ActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(DropPartitionAction.class);
- private final BatchTableCommit commit;
+ private final List<Map<String, String>> partitions;
+ private final FileStoreCommit commit;
DropPartitionAction(
String warehouse,
@@ -46,8 +49,17 @@ public class DropPartitionAction extends ActionBase {
String tableName,
List<Map<String, String>> partitions) {
super(warehouse, databaseName, tableName);
+ if (!(table instanceof FileStoreTable)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Only FileStoreTable supports drop-partition
action. The table type is '%s'.",
+ table.getClass().getName()));
+ }
+
+ this.partitions = partitions;
- this.commit =
table.newBatchWriteBuilder().withOverwrite(partitions).newCommit();
+ AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
+ this.commit =
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
}
public static Optional<Action> create(String[] args) {
@@ -112,6 +124,6 @@ public class DropPartitionAction extends ActionBase {
@Override
public void run() throws Exception {
- this.commit.commit(Collections.emptyList());
+ commit.dropPartitions(partitions);
}
}