This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7dea08fc7 [core][bug] PartitionExpire should use proper commit
identifier (#879)
7dea08fc7 is described below
commit 7dea08fc7775b6deb4e19ff0a3fac59f5a988816
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 12 18:09:21 2023 +0800
[core][bug] PartitionExpire should use proper commit identifier (#879)
---
.../apache/paimon/operation/FileStoreCommit.java | 2 +-
.../paimon/operation/FileStoreCommitImpl.java | 6 +-
.../apache/paimon/operation/PartitionExpire.java | 12 +--
.../apache/paimon/table/sink/TableCommitImpl.java | 12 +--
.../test/java/org/apache/paimon/TestFileStore.java | 2 +-
.../paimon/operation/PartitionExpireTest.java | 90 +++++++++++++++++++++-
.../paimon/flink/action/DropPartitionAction.java | 3 +-
7 files changed, 105 insertions(+), 22 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index a60488f4e..ac1d17aba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -71,5 +71,5 @@ public interface FileStoreCommit {
*
* @param partitions A list of partition {@link Map}s. NOTE: cannot be
empty!
*/
- void dropPartitions(List<Map<String, String>> partitions);
+ void dropPartitions(List<Map<String, String>> partitions, long
commitIdentifier);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 1d307896d..da0d75d56 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -35,7 +35,6 @@ import org.apache.paimon.options.MemorySize;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
@@ -351,7 +350,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
@Override
- public void dropPartitions(List<Map<String, String>> partitions) {
+ public void dropPartitions(List<Map<String, String>> partitions, long
commitIdentifier) {
Preconditions.checkArgument(!partitions.isEmpty(), "Partitions list
cannot be empty.");
if (LOG.isDebugEnabled()) {
@@ -369,8 +368,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
tryOverwrite(
partitionFilter,
Collections.emptyList(),
- // identifier is MAX_VALUE to avoid conflict
- BatchWriteBuilder.COMMIT_IDENTIFIER,
+ commitIdentifier,
null,
Collections.emptyMap());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index f7687df28..05ab8ec22 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -70,8 +70,8 @@ public class PartitionExpire {
return this;
}
- public void expire() {
- expire(LocalDateTime.now());
+ public void expire(long commitIdentifier) {
+ expire(LocalDateTime.now(), commitIdentifier);
}
@VisibleForTesting
@@ -80,14 +80,14 @@ public class PartitionExpire {
}
@VisibleForTesting
- void expire(LocalDateTime now) {
+ void expire(LocalDateTime now, long commitIdentifier) {
if (now.isAfter(lastCheck.plus(checkInterval))) {
- doExpire(now.minus(expirationTime));
+ doExpire(now.minus(expirationTime), commitIdentifier);
lastCheck = now;
}
}
- private void doExpire(LocalDateTime expireDateTime) {
+ private void doExpire(LocalDateTime expireDateTime, long commitIdentifier)
{
List<BinaryRow> partitions = readPartitions();
List<Map<String, String>> expired = new ArrayList<>();
for (BinaryRow partition : partitions) {
@@ -99,7 +99,7 @@ public class PartitionExpire {
}
if (expired.size() > 0) {
- commit.dropPartitions(expired);
+ commit.dropPartitions(expired, commitIdentifier);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 91af65175..4b411e7a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -102,7 +102,7 @@ public class TableCommitImpl implements InnerTableCommit {
} else {
commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
}
- expire();
+ expire(identifier);
}
public void commit(ManifestCommittable committable) {
@@ -114,6 +114,9 @@ public class TableCommitImpl implements InnerTableCommit {
for (ManifestCommittable committable : committables) {
commit.commit(committable, new HashMap<>());
}
+ if (!committables.isEmpty()) {
+ expire(committables.get(committables.size() - 1).identifier());
+ }
} else {
ManifestCommittable committable;
if (committables.size() > 1) {
@@ -129,18 +132,17 @@ public class TableCommitImpl implements InnerTableCommit {
committable = new ManifestCommittable(Long.MAX_VALUE);
}
commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
+ expire(committable.identifier());
}
-
- expire();
}
- private void expire() {
+ private void expire(long partitionExpireIdentifier) {
if (expire != null) {
expire.expire();
}
if (partitionExpire != null) {
- partitionExpire.expire();
+ partitionExpire.expire(partitionExpireIdentifier);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 85ec2b221..4552008e1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -203,7 +203,7 @@ public class TestFileStore extends KeyValueFileStore {
if (snapshotIdBeforeCommit == null) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
- commit.dropPartitions(partitions);
+ commit.dropPartitions(partitions, Long.MAX_VALUE);
Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
assertThat(snapshotIdAfterCommit).isNotNull();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 139c8435b..7dae4405c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
@@ -28,6 +29,8 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
@@ -40,13 +43,22 @@ import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import static
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
+import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -102,22 +114,92 @@ public class PartitionExpireTest {
PartitionExpire expire = newExpire();
expire.setLastCheck(date(1));
- expire.expire(date(3));
+ expire.expire(date(3), Long.MAX_VALUE);
assertThat(read())
.containsExactlyInAnyOrder(
"20230101:11", "20230101:12", "20230103:31",
"20230103:32", "20230105:51");
- expire.expire(date(5));
+ expire.expire(date(5), Long.MAX_VALUE);
assertThat(read()).containsExactlyInAnyOrder("20230103:31",
"20230103:32", "20230105:51");
// PARTITION_EXPIRATION_INTERVAL not trigger
- expire.expire(date(6));
+ expire.expire(date(6), Long.MAX_VALUE);
assertThat(read()).containsExactlyInAnyOrder("20230103:31",
"20230103:32", "20230105:51");
- expire.expire(date(8));
+ expire.expire(date(8), Long.MAX_VALUE);
assertThat(read()).isEmpty();
}
+ @Test
+ public void testFilterCommittedAfterExpiring() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ Collections.singletonList("f0"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ ""));
+
+ table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ // disable compaction and snapshot expiration
+ table = table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"));
+ String commitUser = UUID.randomUUID().toString();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ // prepare commits
+ int now =
+ Integer.parseInt(
+
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
+ int preparedCommits = random.nextInt(20, 30);
+
+ List<List<CommitMessage>> commitMessages = new ArrayList<>();
+ Set<Long> notCommitted = new HashSet<>();
+ for (int i = 0; i < preparedCommits; i++) {
+ // ensure the partition will be expired
+ String f0 = String.valueOf(now - random.nextInt(10));
+ String f1 = String.valueOf(random.nextInt(25));
+ StreamTableWrite write = table.newWrite(commitUser);
+ write.write(GenericRow.of(BinaryString.fromString(f0),
BinaryString.fromString(f1)));
+ commitMessages.add(write.prepareCommit(false, i));
+ notCommitted.add((long) i);
+ }
+
+ // commit a part of data and trigger partition expire
+ int successCommits = random.nextInt(preparedCommits / 4,
preparedCommits / 2);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ for (int i = 0; i < successCommits - 2; i++) {
+ commit.commit(i, commitMessages.get(i));
+ notCommitted.remove((long) i);
+ }
+
+ // we need two commits to trigger partition expire
+ // the first commit will set the last check time to now
+ // the second commit will do the partition expire
+ Map<String, String> options = new HashMap<>();
+ options.put(WRITE_ONLY.key(), "false");
+ options.put(PARTITION_EXPIRATION_TIME.key(), "1 d");
+ options.put(PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "5 s");
+ options.put(PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
+ table = table.copy(options);
+ commit = table.newCommit(commitUser);
+ commit.commit(successCommits - 2, commitMessages.get(successCommits -
2));
+ notCommitted.remove((long) (successCommits - 2));
+ Thread.sleep(5000);
+ commit.commit(successCommits - 1, commitMessages.get(successCommits -
1));
+ notCommitted.remove((long) (successCommits - 1));
+
+ // check whether partition expire is triggered
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+ // check filter
+ Set<Long> toBeFiltered =
+ LongStream.range(0,
preparedCommits).boxed().collect(Collectors.toSet());
+ assertThat(commit.filterCommitted(toBeFiltered))
+ .containsExactlyInAnyOrderElementsOf(notCommitted);
+ }
+
private List<String> read() throws IOException {
List<String> ret = new ArrayList<>();
table.newRead()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index 543d92a46..21f38e5eb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
@@ -124,6 +125,6 @@ public class DropPartitionAction extends ActionBase {
@Override
public void run() throws Exception {
- commit.dropPartitions(partitions);
+ commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}