This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b80e143e [core] FileStoreCommit should check append files only in 
recovering (#3112)
3b80e143e is described below

commit 3b80e143e21a515e1f6a7f9d6853fbabc1384849
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 1 17:32:25 2024 +0800

    [core] FileStoreCommit should check append files only in recovering (#3112)
---
 .../apache/paimon/operation/FileStoreCommit.java   | 19 +++------
 .../paimon/operation/FileStoreCommitImpl.java      | 45 +++++++++++++++++-----
 .../paimon/table/sink/StreamTableCommit.java       | 12 ------
 .../apache/paimon/table/sink/TableCommitImpl.java  | 13 ++-----
 .../apache/paimon/operation/FileDeletionTest.java  |  3 +-
 .../paimon/operation/FileStoreCommitTest.java      | 21 +++++-----
 .../paimon/operation/PartitionExpireTest.java      | 32 +++++++--------
 .../apache/paimon/operation/TestCommitThread.java  |  3 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   |  2 +-
 9 files changed, 78 insertions(+), 72 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 7151f2512..f43308c30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -29,7 +29,6 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /** Commit operation which provides commit and overwrite. */
 public interface FileStoreCommit {
@@ -39,24 +38,18 @@ public interface FileStoreCommit {
 
     FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
 
-    /** Find out which manifest committable need to be retried when recovering 
from the failure. */
-    default List<ManifestCommittable> 
filterCommitted(List<ManifestCommittable> committableList) {
-        Set<Long> identifiers =
-                filterCommitted(
-                        committableList.stream()
-                                .map(ManifestCommittable::identifier)
-                                .collect(Collectors.toSet()));
-        return committableList.stream()
-                .filter(m -> identifiers.contains(m.identifier()))
-                .collect(Collectors.toList());
-    }
-
     /** Find out which commit identifier need to be retried when recovering 
from the failure. */
     Set<Long> filterCommitted(Set<Long> commitIdentifiers);
 
     /** Commit from manifest committable. */
     void commit(ManifestCommittable committable, Map<String, String> 
properties);
 
+    /** Commit from manifest committable with checkAppendFiles. */
+    void commit(
+            ManifestCommittable committable,
+            Map<String, String> properties,
+            boolean checkAppendFiles);
+
     /**
      * Overwrite from manifest committable and partition.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3f2ad87ac..19819357c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -205,6 +205,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
     @Override
     public void commit(ManifestCommittable committable, Map<String, String> 
properties) {
+        commit(committable, properties, false);
+    }
+
+    @Override
+    public void commit(
+            ManifestCommittable committable,
+            Map<String, String> properties,
+            boolean checkAppendFiles) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Ready to commit\n" + committable.toString());
         }
@@ -244,7 +252,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 // This optimization is mainly used to decrease the number of 
times we read from
                 // files.
                 latestSnapshot = snapshotManager.latestSnapshot(branchName);
-                if (latestSnapshot != null) {
+                if (latestSnapshot != null && checkAppendFiles) {
                     // it is possible that some partitions only have compact 
changes,
                     // so we need to contain all changes
                     baseEntries.addAll(
@@ -264,7 +272,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.APPEND,
-                                safeLatestSnapshotId,
+                                noConflictCheck(),
                                 branchName,
                                 null);
                 generatedSnapshot += 1;
@@ -299,7 +307,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.COMPACT,
-                                safeLatestSnapshotId,
+                                hasConflictChecked(safeLatestSnapshotId),
                                 branchName,
                                 null);
                 generatedSnapshot += 1;
@@ -447,7 +455,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.COMPACT,
-                                null,
+                                mustConflictCheck(),
                                 branchName,
                                 null);
                 generatedSnapshot += 1;
@@ -543,7 +551,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 null,
                 Collections.emptyMap(),
                 Snapshot.CommitKind.ANALYZE,
-                null,
+                noConflictCheck(),
                 branchName,
                 statsFileName);
     }
@@ -641,7 +649,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
-            @Nullable Long safeLatestSnapshotId,
+            ConflictCheck conflictCheck,
             String branchName,
             @Nullable String statsFileName) {
         int cnt = 0;
@@ -657,7 +665,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     logOffsets,
                     commitKind,
                     latestSnapshot,
-                    safeLatestSnapshotId,
+                    conflictCheck,
                     branchName,
                     statsFileName)) {
                 break;
@@ -719,7 +727,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     logOffsets,
                     Snapshot.CommitKind.OVERWRITE,
                     latestSnapshot,
-                    null,
+                    mustConflictCheck(),
                     branchName,
                     null)) {
                 break;
@@ -738,7 +746,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             @Nullable Snapshot latestSnapshot,
-            @Nullable Long safeLatestSnapshotId,
+            ConflictCheck conflictCheck,
             String branchName,
             @Nullable String newStatsFileName) {
         long newSnapshotId =
@@ -759,7 +767,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             }
         }
 
-        if (latestSnapshot != null && !Objects.equals(latestSnapshot.id(), 
safeLatestSnapshotId)) {
+        if (latestSnapshot != null && 
conflictCheck.shouldCheck(latestSnapshot.id())) {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
             noConflictsOrFail(latestSnapshot.commitUser(), latestSnapshot, 
tableFiles);
@@ -1209,4 +1217,21 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             return Objects.hash(partition, bucket, level);
         }
     }
+
+    /** Should do conflict check. */
+    interface ConflictCheck {
+        boolean shouldCheck(long latestSnapshot);
+    }
+
+    static ConflictCheck hasConflictChecked(@Nullable Long 
checkedLatestSnapshotId) {
+        return latestSnapshot -> !Objects.equals(latestSnapshot, 
checkedLatestSnapshotId);
+    }
+
+    static ConflictCheck noConflictCheck() {
+        return latestSnapshot -> false;
+    }
+
+    static ConflictCheck mustConflictCheck() {
+        return latestSnapshot -> true;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java
index d2c91dc40..9e708d669 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.Public;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * A {@link TableCommit} for stream processing. You can use this class to 
commit multiple times.
@@ -33,17 +32,6 @@ import java.util.Set;
 @Public
 public interface StreamTableCommit extends TableCommit {
 
-    /**
-     * Filter committed commits. Return uncommitted identifiers. This method 
is used for failover
-     * cases.
-     *
-     * @deprecated Use {@link StreamTableCommit#filterAndCommit} to filter and 
commit all {@link
-     *     CommitMessage} in question with one method call, instead of calling 
this method first and
-     *     then call {@link StreamTableCommit#commit}.
-     */
-    @Deprecated
-    Set<Long> filterCommitted(Set<Long> commitIdentifiers);
-
     /**
      * Create a new commit. One commit may generate up to two snapshots, one 
for adding new files
      * and the other for compaction. There will be some expiration policies 
after commit:
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index c76b750a1..c63511ac4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -153,11 +153,6 @@ public class TableCommitImpl implements InnerTableCommit {
         return this;
     }
 
-    @Override
-    public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
-        return commit.filterCommitted(commitIdentifiers);
-    }
-
     @Override
     public void commit(List<CommitMessage> commitMessages) {
         checkCommitted();
@@ -198,13 +193,13 @@ public class TableCommitImpl implements InnerTableCommit {
     }
 
     public void commit(ManifestCommittable committable) {
-        commitMultiple(Collections.singletonList(committable));
+        commitMultiple(Collections.singletonList(committable), false);
     }
 
-    public void commitMultiple(List<ManifestCommittable> committables) {
+    public void commitMultiple(List<ManifestCommittable> committables, boolean 
checkAppendFiles) {
         if (overwritePartition == null) {
             for (ManifestCommittable committable : committables) {
-                commit.commit(committable, new HashMap<>());
+                commit.commit(committable, new HashMap<>(), checkAppendFiles);
             }
             if (!committables.isEmpty()) {
                 expire(committables.get(committables.size() - 1).identifier(), 
expireMainExecutor);
@@ -253,7 +248,7 @@ public class TableCommitImpl implements InnerTableCommit {
                         .collect(Collectors.toList());
         if (retryCommittables.size() > 0) {
             checkFilesExistence(retryCommittables);
-            commitMultiple(retryCommittables);
+            commitMultiple(retryCommittables, true);
         }
         return retryCommittables.size();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 322a353f2..819b70d8a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
 import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
 import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
 import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
@@ -729,7 +730,7 @@ public class FileDeletionTest {
                         Collections.emptyMap(),
                         Snapshot.CommitKind.APPEND,
                         store.snapshotManager().latestSnapshot(),
-                        null,
+                        mustConflictCheck(),
                         DEFAULT_MAIN_BRANCH,
                         null);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index b0cec3f44..4524aba51 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -65,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
@@ -174,8 +175,7 @@ public class FileStoreCommitTest {
         Path firstSnapshotPath = 
snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
         LocalFileIO.create().deleteQuietly(firstSnapshotPath);
         // this test succeeds if this call does not fail
-        store.newCommit(UUID.randomUUID().toString())
-                .filterCommitted(Collections.singletonList(new 
ManifestCommittable(999)));
+        
store.newCommit(UUID.randomUUID().toString()).filterCommitted(Collections.singleton(999L));
     }
 
     @Test
@@ -194,12 +194,7 @@ public class FileStoreCommitTest {
         }
 
         // all commit identifiers should be filtered out
-        List<ManifestCommittable> remaining =
-                store.newCommit(user)
-                        .filterCommitted(
-                                commitIdentifiers.stream()
-                                        .map(ManifestCommittable::new)
-                                        .collect(Collectors.toList()));
+        Set<Long> remaining = 
store.newCommit(user).filterCommitted(commitIdentifiers);
         assertThat(remaining).isEmpty();
     }
 
@@ -557,10 +552,18 @@ public class FileStoreCommitTest {
             assertThatThrownBy(
                             () ->
                                     store.newCommit()
-                                            .commit(committables.get(0), 
Collections.emptyMap()))
+                                            .commit(
+                                                    committables.get(0),
+                                                    Collections.emptyMap(),
+                                                    true))
                     .isInstanceOf(RuntimeException.class)
                     .hasMessageContaining("Give up committing.");
         }
+
+        // commit without check, should pass
+        for (int i = 0; i < 3; i++) {
+            store.newCommit().commit(committables.get(0), 
Collections.emptyMap());
+        }
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 94dc3cfe4..e29bcd34a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -49,14 +50,10 @@ import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
 import static 
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
 import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
@@ -177,7 +174,6 @@ public class PartitionExpireTest {
         int preparedCommits = random.nextInt(20, 30);
 
         List<List<CommitMessage>> commitMessages = new ArrayList<>();
-        Set<Long> notCommitted = new HashSet<>();
         for (int i = 0; i < preparedCommits; i++) {
             // ensure the partition will be expired
             String f0 =
@@ -187,7 +183,6 @@ public class PartitionExpireTest {
             StreamTableWrite write = table.newWrite(commitUser);
             write.write(GenericRow.of(BinaryString.fromString(f0), 
BinaryString.fromString(f1)));
             commitMessages.add(write.prepareCommit(false, i));
-            notCommitted.add((long) i);
         }
 
         // commit a part of data and trigger partition expire
@@ -195,7 +190,6 @@ public class PartitionExpireTest {
         StreamTableCommit commit = table.newCommit(commitUser);
         for (int i = 0; i < successCommits - 2; i++) {
             commit.commit(i, commitMessages.get(i));
-            notCommitted.remove((long) i);
         }
 
         // we need two commits to trigger partition expire
@@ -210,21 +204,27 @@ public class PartitionExpireTest {
         commit.close();
         commit = table.newCommit(commitUser);
         commit.commit(successCommits - 2, commitMessages.get(successCommits - 
2));
-        notCommitted.remove((long) (successCommits - 2));
         Thread.sleep(5000);
         commit.commit(successCommits - 1, commitMessages.get(successCommits - 
1));
-        notCommitted.remove((long) (successCommits - 1));
-        commit.close();
 
         // check whether partition expire is triggered
-        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        SnapshotManager snapshotManager = table.snapshotManager();
+        Snapshot snapshot = snapshotManager.latestSnapshot();
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
 
-        // check filter
-        Set<Long> toBeFiltered =
-                LongStream.range(0, 
preparedCommits).boxed().collect(Collectors.toSet());
-        assertThat(commit.filterCommitted(toBeFiltered))
-                .containsExactlyInAnyOrderElementsOf(notCommitted);
+        // filterAndCommit
+        Map<Long, List<CommitMessage>> allCommits = new HashMap<>();
+        for (int i = 0; i < preparedCommits; i++) {
+            allCommits.put((long) i, commitMessages.get(i));
+        }
+
+        // no exception here
+        commit.filterAndCommit(allCommits);
+        commit.close();
+
+        // check commit last
+        assertThat(snapshotManager.latestSnapshot().commitIdentifier())
+                .isEqualTo(allCommits.size() - 1);
     }
 
     private List<String> read() throws IOException {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 872cb554f..748ec53cd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -193,7 +193,8 @@ public class TestCommitThread extends Thread {
         while (true) {
             try {
                 if (shouldCheckFilter) {
-                    if 
(commit.filterCommitted(Collections.singletonList(committable)).isEmpty()) {
+                    if 
(commit.filterCommitted(Collections.singleton(committable.identifier()))
+                            .isEmpty()) {
                         break;
                     }
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 620bcbbc4..17c1d209a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -97,7 +97,7 @@ public class StoreCommitter implements Committer<Committable, 
ManifestCommittabl
     @Override
     public void commit(List<ManifestCommittable> committables)
             throws IOException, InterruptedException {
-        commit.commitMultiple(committables);
+        commit.commitMultiple(committables, false);
         calcNumBytesAndRecordsOut(committables);
     }
 

Reply via email to