This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3b80e143e [core] FileStoreCommit should check append files only in
recovering (#3112)
3b80e143e is described below
commit 3b80e143e21a515e1f6a7f9d6853fbabc1384849
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 1 17:32:25 2024 +0800
[core] FileStoreCommit should check append files only in recovering (#3112)
---
.../apache/paimon/operation/FileStoreCommit.java | 19 +++------
.../paimon/operation/FileStoreCommitImpl.java | 45 +++++++++++++++++-----
.../paimon/table/sink/StreamTableCommit.java | 12 ------
.../apache/paimon/table/sink/TableCommitImpl.java | 13 ++-----
.../apache/paimon/operation/FileDeletionTest.java | 3 +-
.../paimon/operation/FileStoreCommitTest.java | 21 +++++-----
.../paimon/operation/PartitionExpireTest.java | 32 +++++++--------
.../apache/paimon/operation/TestCommitThread.java | 3 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 2 +-
9 files changed, 78 insertions(+), 72 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 7151f2512..f43308c30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -29,7 +29,6 @@ import org.apache.paimon.utils.FileStorePathFactory;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
/** Commit operation which provides commit and overwrite. */
public interface FileStoreCommit {
@@ -39,24 +38,18 @@ public interface FileStoreCommit {
FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
- /** Find out which manifest committable need to be retried when recovering
from the failure. */
- default List<ManifestCommittable>
filterCommitted(List<ManifestCommittable> committableList) {
- Set<Long> identifiers =
- filterCommitted(
- committableList.stream()
- .map(ManifestCommittable::identifier)
- .collect(Collectors.toSet()));
- return committableList.stream()
- .filter(m -> identifiers.contains(m.identifier()))
- .collect(Collectors.toList());
- }
-
/** Find out which commit identifier need to be retried when recovering
from the failure. */
Set<Long> filterCommitted(Set<Long> commitIdentifiers);
/** Commit from manifest committable. */
void commit(ManifestCommittable committable, Map<String, String>
properties);
+ /** Commit from manifest committable with checkAppendFiles. */
+ void commit(
+ ManifestCommittable committable,
+ Map<String, String> properties,
+ boolean checkAppendFiles);
+
/**
* Overwrite from manifest committable and partition.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 3f2ad87ac..19819357c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -205,6 +205,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Override
public void commit(ManifestCommittable committable, Map<String, String>
properties) {
+ commit(committable, properties, false);
+ }
+
+ @Override
+ public void commit(
+ ManifestCommittable committable,
+ Map<String, String> properties,
+ boolean checkAppendFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit\n" + committable.toString());
}
@@ -244,7 +252,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
// This optimization is mainly used to decrease the number of
times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot(branchName);
- if (latestSnapshot != null) {
+ if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact
changes,
// so we need to contain all changes
baseEntries.addAll(
@@ -264,7 +272,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
- safeLatestSnapshotId,
+ noConflictCheck(),
branchName,
null);
generatedSnapshot += 1;
@@ -299,7 +307,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
- safeLatestSnapshotId,
+ hasConflictChecked(safeLatestSnapshotId),
branchName,
null);
generatedSnapshot += 1;
@@ -447,7 +455,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
- null,
+ mustConflictCheck(),
branchName,
null);
generatedSnapshot += 1;
@@ -543,7 +551,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
null,
Collections.emptyMap(),
Snapshot.CommitKind.ANALYZE,
- null,
+ noConflictCheck(),
branchName,
statsFileName);
}
@@ -641,7 +649,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
- @Nullable Long safeLatestSnapshotId,
+ ConflictCheck conflictCheck,
String branchName,
@Nullable String statsFileName) {
int cnt = 0;
@@ -657,7 +665,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
logOffsets,
commitKind,
latestSnapshot,
- safeLatestSnapshotId,
+ conflictCheck,
branchName,
statsFileName)) {
break;
@@ -719,7 +727,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
- null,
+ mustConflictCheck(),
branchName,
null)) {
break;
@@ -738,7 +746,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
@Nullable Snapshot latestSnapshot,
- @Nullable Long safeLatestSnapshotId,
+ ConflictCheck conflictCheck,
String branchName,
@Nullable String newStatsFileName) {
long newSnapshotId =
@@ -759,7 +767,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
}
- if (latestSnapshot != null && !Objects.equals(latestSnapshot.id(),
safeLatestSnapshotId)) {
+ if (latestSnapshot != null &&
conflictCheck.shouldCheck(latestSnapshot.id())) {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
noConflictsOrFail(latestSnapshot.commitUser(), latestSnapshot,
tableFiles);
@@ -1209,4 +1217,21 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return Objects.hash(partition, bucket, level);
}
}
+
+ /** Should do conflict check. */
+ interface ConflictCheck {
+ boolean shouldCheck(long latestSnapshot);
+ }
+
+ static ConflictCheck hasConflictChecked(@Nullable Long
checkedLatestSnapshotId) {
+ return latestSnapshot -> !Objects.equals(latestSnapshot,
checkedLatestSnapshotId);
+ }
+
+ static ConflictCheck noConflictCheck() {
+ return latestSnapshot -> false;
+ }
+
+ static ConflictCheck mustConflictCheck() {
+ return latestSnapshot -> true;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java
index d2c91dc40..9e708d669 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamTableCommit.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.Public;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* A {@link TableCommit} for stream processing. You can use this class to
commit multiple times.
@@ -33,17 +32,6 @@ import java.util.Set;
@Public
public interface StreamTableCommit extends TableCommit {
- /**
- * Filter committed commits. Return uncommitted identifiers. This method
is used for failover
- * cases.
- *
- * @deprecated Use {@link StreamTableCommit#filterAndCommit} to filter and
commit all {@link
- * CommitMessage} in question with one method call, instead of calling
this method first and
- * then call {@link StreamTableCommit#commit}.
- */
- @Deprecated
- Set<Long> filterCommitted(Set<Long> commitIdentifiers);
-
/**
* Create a new commit. One commit may generate up to two snapshots, one
for adding new files
* and the other for compaction. There will be some expiration policies
after commit:
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index c76b750a1..c63511ac4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -153,11 +153,6 @@ public class TableCommitImpl implements InnerTableCommit {
return this;
}
- @Override
- public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
- return commit.filterCommitted(commitIdentifiers);
- }
-
@Override
public void commit(List<CommitMessage> commitMessages) {
checkCommitted();
@@ -198,13 +193,13 @@ public class TableCommitImpl implements InnerTableCommit {
}
public void commit(ManifestCommittable committable) {
- commitMultiple(Collections.singletonList(committable));
+ commitMultiple(Collections.singletonList(committable), false);
}
- public void commitMultiple(List<ManifestCommittable> committables) {
+ public void commitMultiple(List<ManifestCommittable> committables, boolean
checkAppendFiles) {
if (overwritePartition == null) {
for (ManifestCommittable committable : committables) {
- commit.commit(committable, new HashMap<>());
+ commit.commit(committable, new HashMap<>(), checkAppendFiles);
}
if (!committables.isEmpty()) {
expire(committables.get(committables.size() - 1).identifier(),
expireMainExecutor);
@@ -253,7 +248,7 @@ public class TableCommitImpl implements InnerTableCommit {
.collect(Collectors.toList());
if (retryCommittables.size() > 0) {
checkFilesExistence(retryCommittables);
- commitMultiple(retryCommittables);
+ commitMultiple(retryCommittables, true);
}
return retryCommittables.size();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 322a353f2..819b70d8a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import static
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
@@ -729,7 +730,7 @@ public class FileDeletionTest {
Collections.emptyMap(),
Snapshot.CommitKind.APPEND,
store.snapshotManager().latestSnapshot(),
- null,
+ mustConflictCheck(),
DEFAULT_MAIN_BRANCH,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index b0cec3f44..4524aba51 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -65,6 +65,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
@@ -174,8 +175,7 @@ public class FileStoreCommitTest {
Path firstSnapshotPath =
snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
LocalFileIO.create().deleteQuietly(firstSnapshotPath);
// this test succeeds if this call does not fail
- store.newCommit(UUID.randomUUID().toString())
- .filterCommitted(Collections.singletonList(new
ManifestCommittable(999)));
+
store.newCommit(UUID.randomUUID().toString()).filterCommitted(Collections.singleton(999L));
}
@Test
@@ -194,12 +194,7 @@ public class FileStoreCommitTest {
}
// all commit identifiers should be filtered out
- List<ManifestCommittable> remaining =
- store.newCommit(user)
- .filterCommitted(
- commitIdentifiers.stream()
- .map(ManifestCommittable::new)
- .collect(Collectors.toList()));
+ Set<Long> remaining =
store.newCommit(user).filterCommitted(commitIdentifiers);
assertThat(remaining).isEmpty();
}
@@ -557,10 +552,18 @@ public class FileStoreCommitTest {
assertThatThrownBy(
() ->
store.newCommit()
- .commit(committables.get(0),
Collections.emptyMap()))
+ .commit(
+ committables.get(0),
+ Collections.emptyMap(),
+ true))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Give up committing.");
}
+
+ // commit without check, should pass
+ for (int i = 0; i < 3; i++) {
+ store.newCommit().commit(committables.get(0),
Collections.emptyMap());
+ }
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 94dc3cfe4..e29bcd34a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -49,14 +50,10 @@ import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
import static
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
@@ -177,7 +174,6 @@ public class PartitionExpireTest {
int preparedCommits = random.nextInt(20, 30);
List<List<CommitMessage>> commitMessages = new ArrayList<>();
- Set<Long> notCommitted = new HashSet<>();
for (int i = 0; i < preparedCommits; i++) {
// ensure the partition will be expired
String f0 =
@@ -187,7 +183,6 @@ public class PartitionExpireTest {
StreamTableWrite write = table.newWrite(commitUser);
write.write(GenericRow.of(BinaryString.fromString(f0),
BinaryString.fromString(f1)));
commitMessages.add(write.prepareCommit(false, i));
- notCommitted.add((long) i);
}
// commit a part of data and trigger partition expire
@@ -195,7 +190,6 @@ public class PartitionExpireTest {
StreamTableCommit commit = table.newCommit(commitUser);
for (int i = 0; i < successCommits - 2; i++) {
commit.commit(i, commitMessages.get(i));
- notCommitted.remove((long) i);
}
// we need two commits to trigger partition expire
@@ -210,21 +204,27 @@ public class PartitionExpireTest {
commit.close();
commit = table.newCommit(commitUser);
commit.commit(successCommits - 2, commitMessages.get(successCommits -
2));
- notCommitted.remove((long) (successCommits - 2));
Thread.sleep(5000);
commit.commit(successCommits - 1, commitMessages.get(successCommits -
1));
- notCommitted.remove((long) (successCommits - 1));
- commit.close();
// check whether partition expire is triggered
- Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ SnapshotManager snapshotManager = table.snapshotManager();
+ Snapshot snapshot = snapshotManager.latestSnapshot();
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
- // check filter
- Set<Long> toBeFiltered =
- LongStream.range(0,
preparedCommits).boxed().collect(Collectors.toSet());
- assertThat(commit.filterCommitted(toBeFiltered))
- .containsExactlyInAnyOrderElementsOf(notCommitted);
+ // filterAndCommit
+ Map<Long, List<CommitMessage>> allCommits = new HashMap<>();
+ for (int i = 0; i < preparedCommits; i++) {
+ allCommits.put((long) i, commitMessages.get(i));
+ }
+
+ // no exception here
+ commit.filterAndCommit(allCommits);
+ commit.close();
+
+ // check commit last
+ assertThat(snapshotManager.latestSnapshot().commitIdentifier())
+ .isEqualTo(allCommits.size() - 1);
}
private List<String> read() throws IOException {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 872cb554f..748ec53cd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -193,7 +193,8 @@ public class TestCommitThread extends Thread {
while (true) {
try {
if (shouldCheckFilter) {
- if
(commit.filterCommitted(Collections.singletonList(committable)).isEmpty()) {
+ if
(commit.filterCommitted(Collections.singleton(committable.identifier()))
+ .isEmpty()) {
break;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 620bcbbc4..17c1d209a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -97,7 +97,7 @@ public class StoreCommitter implements Committer<Committable,
ManifestCommittabl
@Override
public void commit(List<ManifestCommittable> committables)
throws IOException, InterruptedException {
- commit.commitMultiple(committables);
+ commit.commitMultiple(committables, false);
calcNumBytesAndRecordsOut(committables);
}