This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dea08fc7 [core][bug] PartitionExpire should use proper commit 
identifier (#879)
7dea08fc7 is described below

commit 7dea08fc7775b6deb4e19ff0a3fac59f5a988816
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 12 18:09:21 2023 +0800

    [core][bug] PartitionExpire should use proper commit identifier (#879)
---
 .../apache/paimon/operation/FileStoreCommit.java   |  2 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  6 +-
 .../apache/paimon/operation/PartitionExpire.java   | 12 +--
 .../apache/paimon/table/sink/TableCommitImpl.java  | 12 +--
 .../test/java/org/apache/paimon/TestFileStore.java |  2 +-
 .../paimon/operation/PartitionExpireTest.java      | 90 +++++++++++++++++++++-
 .../paimon/flink/action/DropPartitionAction.java   |  3 +-
 7 files changed, 105 insertions(+), 22 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index a60488f4e..ac1d17aba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -71,5 +71,5 @@ public interface FileStoreCommit {
      *
      * @param partitions A list of partition {@link Map}s. NOTE: cannot be 
empty!
      */
-    void dropPartitions(List<Map<String, String>> partitions);
+    void dropPartitions(List<Map<String, String>> partitions, long 
commitIdentifier);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 1d307896d..da0d75d56 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -35,7 +35,6 @@ import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.RowType;
@@ -351,7 +350,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     @Override
-    public void dropPartitions(List<Map<String, String>> partitions) {
+    public void dropPartitions(List<Map<String, String>> partitions, long 
commitIdentifier) {
         Preconditions.checkArgument(!partitions.isEmpty(), "Partitions list 
cannot be empty.");
 
         if (LOG.isDebugEnabled()) {
@@ -369,8 +368,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         tryOverwrite(
                 partitionFilter,
                 Collections.emptyList(),
-                // identifier is MAX_VALUE to avoid conflict
-                BatchWriteBuilder.COMMIT_IDENTIFIER,
+                commitIdentifier,
                 null,
                 Collections.emptyMap());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index f7687df28..05ab8ec22 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -70,8 +70,8 @@ public class PartitionExpire {
         return this;
     }
 
-    public void expire() {
-        expire(LocalDateTime.now());
+    public void expire(long commitIdentifier) {
+        expire(LocalDateTime.now(), commitIdentifier);
     }
 
     @VisibleForTesting
@@ -80,14 +80,14 @@ public class PartitionExpire {
     }
 
     @VisibleForTesting
-    void expire(LocalDateTime now) {
+    void expire(LocalDateTime now, long commitIdentifier) {
         if (now.isAfter(lastCheck.plus(checkInterval))) {
-            doExpire(now.minus(expirationTime));
+            doExpire(now.minus(expirationTime), commitIdentifier);
             lastCheck = now;
         }
     }
 
-    private void doExpire(LocalDateTime expireDateTime) {
+    private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) 
{
         List<BinaryRow> partitions = readPartitions();
         List<Map<String, String>> expired = new ArrayList<>();
         for (BinaryRow partition : partitions) {
@@ -99,7 +99,7 @@ public class PartitionExpire {
         }
 
         if (expired.size() > 0) {
-            commit.dropPartitions(expired);
+            commit.dropPartitions(expired, commitIdentifier);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 91af65175..4b411e7a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -102,7 +102,7 @@ public class TableCommitImpl implements InnerTableCommit {
         } else {
             commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
         }
-        expire();
+        expire(identifier);
     }
 
     public void commit(ManifestCommittable committable) {
@@ -114,6 +114,9 @@ public class TableCommitImpl implements InnerTableCommit {
             for (ManifestCommittable committable : committables) {
                 commit.commit(committable, new HashMap<>());
             }
+            if (!committables.isEmpty()) {
+                expire(committables.get(committables.size() - 1).identifier());
+            }
         } else {
             ManifestCommittable committable;
             if (committables.size() > 1) {
@@ -129,18 +132,17 @@ public class TableCommitImpl implements InnerTableCommit {
                 committable = new ManifestCommittable(Long.MAX_VALUE);
             }
             commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
+            expire(committable.identifier());
         }
-
-        expire();
     }
 
-    private void expire() {
+    private void expire(long partitionExpireIdentifier) {
         if (expire != null) {
             expire.expire();
         }
 
         if (partitionExpire != null) {
-            partitionExpire.expire();
+            partitionExpire.expire(partitionExpireIdentifier);
         }
     }
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 85ec2b221..4552008e1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -203,7 +203,7 @@ public class TestFileStore extends KeyValueFileStore {
         if (snapshotIdBeforeCommit == null) {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
-        commit.dropPartitions(partitions);
+        commit.dropPartitions(partitions, Long.MAX_VALUE);
 
         Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
         assertThat(snapshotIdAfterCommit).isNotNull();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 139c8435b..7dae4405c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
@@ -28,6 +29,8 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
@@ -40,13 +43,22 @@ import java.io.IOException;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
+import static 
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
 import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
+import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
 import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -102,22 +114,92 @@ public class PartitionExpireTest {
         PartitionExpire expire = newExpire();
         expire.setLastCheck(date(1));
 
-        expire.expire(date(3));
+        expire.expire(date(3), Long.MAX_VALUE);
         assertThat(read())
                 .containsExactlyInAnyOrder(
                         "20230101:11", "20230101:12", "20230103:31", 
"20230103:32", "20230105:51");
 
-        expire.expire(date(5));
+        expire.expire(date(5), Long.MAX_VALUE);
         assertThat(read()).containsExactlyInAnyOrder("20230103:31", 
"20230103:32", "20230105:51");
 
         // PARTITION_EXPIRATION_INTERVAL not trigger
-        expire.expire(date(6));
+        expire.expire(date(6), Long.MAX_VALUE);
         assertThat(read()).containsExactlyInAnyOrder("20230103:31", 
"20230103:32", "20230105:51");
 
-        expire.expire(date(8));
+        expire.expire(date(8), Long.MAX_VALUE);
         assertThat(read()).isEmpty();
     }
 
+    @Test
+    public void testFilterCommittedAfterExpiring() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
+                        Collections.singletonList("f0"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        ""));
+
+        table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+        // disable compaction and snapshot expiration
+        table = table.copy(Collections.singletonMap(WRITE_ONLY.key(), "true"));
+        String commitUser = UUID.randomUUID().toString();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        // prepare commits
+        int now =
+                Integer.parseInt(
+                        
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
+        int preparedCommits = random.nextInt(20, 30);
+
+        List<List<CommitMessage>> commitMessages = new ArrayList<>();
+        Set<Long> notCommitted = new HashSet<>();
+        for (int i = 0; i < preparedCommits; i++) {
+            // ensure the partition will be expired
+            String f0 = String.valueOf(now - random.nextInt(10));
+            String f1 = String.valueOf(random.nextInt(25));
+            StreamTableWrite write = table.newWrite(commitUser);
+            write.write(GenericRow.of(BinaryString.fromString(f0), 
BinaryString.fromString(f1)));
+            commitMessages.add(write.prepareCommit(false, i));
+            notCommitted.add((long) i);
+        }
+
+        // commit a part of data and trigger partition expire
+        int successCommits = random.nextInt(preparedCommits / 4, 
preparedCommits / 2);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        for (int i = 0; i < successCommits - 2; i++) {
+            commit.commit(i, commitMessages.get(i));
+            notCommitted.remove((long) i);
+        }
+
+        // we need two commits to trigger partition expire
+        // the first commit will set the last check time to now
+        // the second commit will do the partition expire
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_ONLY.key(), "false");
+        options.put(PARTITION_EXPIRATION_TIME.key(), "1 d");
+        options.put(PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "5 s");
+        options.put(PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
+        table = table.copy(options);
+        commit = table.newCommit(commitUser);
+        commit.commit(successCommits - 2, commitMessages.get(successCommits - 
2));
+        notCommitted.remove((long) (successCommits - 2));
+        Thread.sleep(5000);
+        commit.commit(successCommits - 1, commitMessages.get(successCommits - 
1));
+        notCommitted.remove((long) (successCommits - 1));
+
+        // check whether partition expire is triggered
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+        // check filter
+        Set<Long> toBeFiltered =
+                LongStream.range(0, 
preparedCommits).boxed().collect(Collectors.toSet());
+        assertThat(commit.filterCommitted(toBeFiltered))
+                .containsExactlyInAnyOrderElementsOf(notCommitted);
+    }
+
     private List<String> read() throws IOException {
         List<String> ret = new ArrayList<>();
         table.newRead()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index 543d92a46..21f38e5eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
@@ -124,6 +125,6 @@ public class DropPartitionAction extends ActionBase {
 
     @Override
     public void run() throws Exception {
-        commit.dropPartitions(partitions);
+        commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
     }
 }

Reply via email to