This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ab14a13c3 [core] Avoid 'trying to delete file' commit exception for
expired partitions (#3945)
ab14a13c3 is described below
commit ab14a13c34db0d9eb6320391d041c1ef30c0d09a
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 15:55:47 2024 +0800
[core] Avoid 'trying to delete file' commit exception for expired
partitions (#3945)
---
.../paimon/utils/InternalRowPartitionComputer.java | 2 +-
.../utils/InternalRowPartitionComputerTest.java | 6 +-
.../java/org/apache/paimon/manifest/FileEntry.java | 9 ---
.../org/apache/paimon/mergetree/LookupFile.java | 4 +-
.../apache/paimon/operation/FileStoreCommit.java | 2 +
.../paimon/operation/FileStoreCommitImpl.java | 92 ++++++++++++++++------
.../apache/paimon/operation/PartitionExpire.java | 19 +++++
.../PartitionValuesTimeExpireStrategy.java | 4 +
.../apache/paimon/table/sink/TableCommitImpl.java | 1 +
.../paimon/operation/PartitionExpireTest.java | 73 ++++++++++++++---
10 files changed, 161 insertions(+), 51 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index 3211a2e32..881f0f4d1 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -79,7 +79,7 @@ public class InternalRowPartitionComputer {
return partValues;
}
- public static String toSimpleString(
+ public static String partToSimpleString(
RowType partitionType, BinaryRow partition, String delimiter, int
maxLength) {
InternalRow.FieldGetter[] getters = partitionType.fieldGetters();
StringBuilder builder = new StringBuilder();
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
index 5f57dd6cf..771136ce9 100644
---
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
@@ -35,7 +35,7 @@ public class InternalRowPartitionComputerTest {
public void testPartitionToString() {
RowType rowType = RowType.of();
BinaryRow binaryRow = new BinaryRow(0);
- assertThat(InternalRowPartitionComputer.toSimpleString(rowType,
binaryRow, "-", 30))
+ assertThat(InternalRowPartitionComputer.partToSimpleString(rowType,
binaryRow, "-", 30))
.isEqualTo("");
rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
@@ -43,7 +43,7 @@ public class InternalRowPartitionComputerTest {
BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
writer.writeString(0, BinaryString.fromString("20240731"));
writer.writeInt(1, 10);
- assertThat(InternalRowPartitionComputer.toSimpleString(rowType,
binaryRow, "-", 30))
+ assertThat(InternalRowPartitionComputer.partToSimpleString(rowType,
binaryRow, "-", 30))
.isEqualTo("20240731-10");
rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
@@ -51,7 +51,7 @@ public class InternalRowPartitionComputerTest {
writer = new BinaryRowWriter(binaryRow);
writer.setNullAt(0);
writer.writeInt(1, 10);
- assertThat(InternalRowPartitionComputer.toSimpleString(rowType,
binaryRow, "-", 30))
+ assertThat(InternalRowPartitionComputer.partToSimpleString(rowType,
binaryRow, "-", 30))
.isEqualTo("null-10");
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 3efc4ea19..145eb93a7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -161,13 +161,4 @@ public interface FileEntry {
manifestFiles,
manifestReadParallelism);
}
-
- static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
- for (T entry : entries) {
- Preconditions.checkState(
- entry.kind() != FileKind.DELETE,
- "Trying to delete file %s which is not previously added.",
- entry.fileName());
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 097b18655..7469684da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -40,7 +40,7 @@ import java.io.UncheckedIOException;
import java.time.Duration;
import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
-import static
org.apache.paimon.utils.InternalRowPartitionComputer.toSimpleString;
+import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Lookup file for cache remote file to local. */
@@ -130,7 +130,7 @@ public class LookupFile {
if (partition.getFieldCount() == 0) {
return String.format("%s-%s", bucket, remoteFileName);
} else {
- String partitionString = toSimpleString(partitionType, partition,
"-", 20);
+ String partitionString = partToSimpleString(partitionType,
partition, "-", 20);
return String.format("%s-%s-%s", partitionString, bucket,
remoteFileName);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index a63e2b733..e15225793 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -37,6 +37,8 @@ public interface FileStoreCommit extends AutoCloseable {
FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
+ FileStoreCommit withPartitionExpire(PartitionExpire partitionExpire);
+
/** Find out which committables need to be retried when recovering from
the failure. */
List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4d72efe9f..3a8d7195c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -80,6 +80,7 @@ import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETIO
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
/**
* Default implementation of {@link FileStoreCommit}.
@@ -125,15 +126,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private final String branchName;
@Nullable private final Integer manifestReadParallelism;
private final List<CommitCallback> commitCallbacks;
+ private final StatsFileHandler statsFileHandler;
+ private final BucketMode bucketMode;
@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
-
private CommitMetrics commitMetrics;
-
- private final StatsFileHandler statsFileHandler;
-
- private final BucketMode bucketMode;
+ @Nullable private PartitionExpire partitionExpire;
public FileStoreCommitImpl(
FileIO fileIO,
@@ -198,6 +197,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return this;
}
+ @Override
+ public FileStoreCommit withPartitionExpire(PartitionExpire
partitionExpire) {
+ this.partitionExpire = partitionExpire;
+ return this;
+ }
+
@Override
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables) {
// nothing to filter, fast exit
@@ -1055,24 +1060,30 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
allEntries.addAll(changes);
- Collection<SimpleFileEntry> mergedEntries;
+ java.util.function.Consumer<Throwable> conflictHandler =
+ e -> {
+ Pair<RuntimeException, RuntimeException> conflictException
=
+ createConflictException(
+ "File deletion conflicts detected! Give up
committing.",
+ baseCommitUser,
+ baseEntries,
+ changes,
+ e,
+ 50);
+ LOG.warn("", conflictException.getLeft());
+ throw conflictException.getRight();
+ };
+
+ Collection<SimpleFileEntry> mergedEntries = null;
try {
// merge manifest entries and also check if the files we want to
delete are still there
mergedEntries = FileEntry.mergeEntries(allEntries);
- FileEntry.assertNoDelete(mergedEntries);
} catch (Throwable e) {
- Pair<RuntimeException, RuntimeException> conflictException =
- createConflictException(
- "File deletion conflicts detected! Give up
committing.",
- baseCommitUser,
- baseEntries,
- changes,
- e,
- 50);
- LOG.warn("", conflictException.getLeft());
- throw conflictException.getRight();
+ conflictHandler.accept(e);
}
+ assertNoDelete(mergedEntries, conflictHandler);
+
// fast exit for file store without keys
if (keyComparator == null) {
return;
@@ -1116,6 +1127,43 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
+ private void assertNoDelete(
+ Collection<SimpleFileEntry> mergedEntries,
+ java.util.function.Consumer<Throwable> conflictHandler) {
+ try {
+ for (SimpleFileEntry entry : mergedEntries) {
+ Preconditions.checkState(
+ entry.kind() != FileKind.DELETE,
+ "Trying to delete file %s which is not previously
added.",
+ entry.fileName());
+ }
+ } catch (Throwable e) {
+ if (partitionExpire != null &&
partitionExpire.isValueExpiration()) {
+ Set<BinaryRow> deletedPartitions = new HashSet<>();
+ for (SimpleFileEntry entry : mergedEntries) {
+ if (entry.kind() == FileKind.DELETE) {
+ deletedPartitions.add(entry.partition());
+ }
+ }
+ if (partitionExpire.isValueAllExpired(deletedPartitions)) {
+ List<String> expiredPartitions =
+ deletedPartitions.stream()
+ .map(
+ partition ->
+ partToSimpleString(
+ partitionType,
partition, "-", 200))
+ .collect(Collectors.toList());
+ throw new RuntimeException(
+ "You are writing data to expired partitions, and
you can filter this data to avoid job failover."
+ + " Otherwise, continuous expired records
will cause the job to failover restart continuously."
+ + " Expired partitions are: "
+ + expiredPartitions);
+ }
+ }
+ conflictHandler.accept(e);
+ }
+ }
+
/**
* Construct detailed conflict exception. The returned exception is formed
of (full exception,
* simplified exception), The simplified exception is generated when the
entry length is larger
@@ -1134,19 +1182,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
"Don't panic!",
"Conflicts during commits are normal and this failure
is intended to resolve the conflicts.",
"Conflicts are mainly caused by the following
scenarios:",
- "1. Your job is suffering from back-pressuring.",
- " There are too many snapshots waiting to be
committed "
- + "and an exception occurred during the commit
procedure "
- + "(most probably due to checkpoint timeout).",
- " See
https://paimon.apache.org/docs/master/maintenance/write-performance/ "
- + "for how to improve writing performance.",
- "2. Multiple jobs are writing into the same partition
at the same time, "
+ "1. Multiple jobs are writing into the same partition
at the same time, "
+ "or you use STATEMENT SET to execute
multiple INSERT statements into the same Paimon table.",
" You'll probably see different base commit user and
current commit user below.",
" You can use "
+
"https://paimon.apache.org/docs/master/maintenance/dedicated-compaction#dedicated-compaction-job"
+ " to support multiple writing.",
- "3. You're recovering from an old savepoint, or you're
creating multiple jobs from a savepoint.",
+ "2. You're recovering from an old savepoint, or you're
creating multiple jobs from a savepoint.",
" The job will fail continuously in this scenario to
protect metadata from corruption.",
" You can either recover from the latest savepoint, "
+ "or you can revert the table to the snapshot
corresponding to the old savepoint.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 2f5ca780c..64074a4c1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -19,9 +19,11 @@
package org.apache.paimon.operation;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.partition.PartitionExpireStrategy;
+import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -86,6 +89,22 @@ public class PartitionExpire {
return expire(LocalDateTime.now(), commitIdentifier);
}
+ public boolean isValueExpiration() {
+ return strategy instanceof PartitionValuesTimeExpireStrategy;
+ }
+
+ public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
+ PartitionValuesTimeExpireStrategy valuesStrategy =
+ (PartitionValuesTimeExpireStrategy) strategy;
+ LocalDateTime expireDateTime =
LocalDateTime.now().minus(expirationTime);
+ for (BinaryRow partition : partitions) {
+ if (!valuesStrategy.isExpired(expireDateTime, partition)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@VisibleForTesting
void setLastCheck(LocalDateTime time) {
lastCheck = time;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
index 37cbed530..80ae633fd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
@@ -61,6 +61,10 @@ public class PartitionValuesTimeExpireStrategy extends
PartitionExpireStrategy {
.readPartitionEntries();
}
+ public boolean isExpired(LocalDateTime expireDateTime, BinaryRow
partition) {
+ return new
PartitionValuesTimePredicate(expireDateTime).test(partition);
+ }
+
/** The expired partition predicate uses the date-format value of the
partition. */
private class PartitionValuesTimePredicate implements PartitionPredicate {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 5e25fcfd0..b4f8fa47d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -106,6 +106,7 @@ public class TableCommitImpl implements InnerTableCommit {
commit.withLock(lock);
if (partitionExpire != null) {
partitionExpire.withLock(lock);
+ commit.withPartitionExpire(partitionExpire);
}
this.commit = commit;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index e29bcd34a..931bac59c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -24,11 +24,15 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
@@ -55,6 +59,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
@@ -83,8 +89,8 @@ public class PartitionExpireTest {
schemaManager.createTable(
new Schema(
RowType.of(VarCharType.STRING_TYPE).getFields(),
- Collections.emptyList(),
- Collections.emptyList(),
+ emptyList(),
+ emptyList(),
Collections.singletonMap(
PARTITION_EXPIRATION_TIME.key(), "1 d"),
"")))
@@ -98,8 +104,8 @@ public class PartitionExpireTest {
schemaManager.createTable(
new Schema(
RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
- Collections.singletonList("f0"),
- Collections.emptyList(),
+ singletonList("f0"),
+ emptyList(),
Collections.emptyMap(),
""));
table = FileStoreTableFactory.create(LocalFileIO.create(), path);
@@ -121,8 +127,8 @@ public class PartitionExpireTest {
schemaManager.createTable(
new Schema(
RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
- Collections.singletonList("f0"),
- Collections.emptyList(),
+ singletonList("f0"),
+ emptyList(),
Collections.emptyMap(),
""));
table = FileStoreTableFactory.create(LocalFileIO.create(), path);
@@ -158,8 +164,8 @@ public class PartitionExpireTest {
schemaManager.createTable(
new Schema(
RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
- Collections.singletonList("f0"),
- Collections.emptyList(),
+ singletonList("f0"),
+ emptyList(),
Collections.emptyMap(),
""));
@@ -227,6 +233,44 @@ public class PartitionExpireTest {
.isEqualTo(allCommits.size() - 1);
}
+ @Test
+ public void testDeleteExpiredPartition() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+ Collections.emptyMap(),
+ ""));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ table = newExpireTable();
+
+ List<CommitMessage> commitMessages = write("20230101", "11");
+ write("20230105", "51");
+
+ PartitionExpire expire = newExpire();
+ expire.setLastCheck(date(1));
+ expire.expire(date(5), Long.MAX_VALUE);
+ assertThat(read()).containsExactlyInAnyOrder("20230105:51");
+
+ TableCommitImpl commit = table.newCommit("");
+ CommitMessageImpl message = (CommitMessageImpl) commitMessages.get(0);
+ DataFileMeta file = message.newFilesIncrement().newFiles().get(0);
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ message.partition(),
+ message.bucket(),
+ new DataIncrement(emptyList(), emptyList(),
emptyList()),
+ new CompactIncrement(singletonList(file), emptyList(),
emptyList()));
+
+ assertThatThrownBy(() -> commit.commit(0L, singletonList(newMessage)))
+ .hasMessage(
+ "You are writing data to expired partitions, and you
can filter "
+ + "this data to avoid job failover. Otherwise,
continuous expired records will cause the"
+ + " job to failover restart continuously.
Expired partitions are: [20230101]");
+ }
+
private List<String> read() throws IOException {
List<String> ret = new ArrayList<>();
table.newRead()
@@ -239,21 +283,28 @@ public class PartitionExpireTest {
return LocalDateTime.of(LocalDate.of(2023, 1, day), LocalTime.MIN);
}
- private void write(String f0, String f1) throws Exception {
+ private List<CommitMessage> write(String f0, String f1) throws Exception {
StreamTableWrite write =
table.copy(Collections.singletonMap(WRITE_ONLY.key(),
"true")).newWrite("");
write.write(GenericRow.of(BinaryString.fromString(f0),
BinaryString.fromString(f1)));
TableCommitImpl commit = table.newCommit("");
- commit.commit(0, write.prepareCommit(true, 0));
+ List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+ commit.commit(0, commitMessages);
write.close();
commit.close();
+
+ return commitMessages;
}
private PartitionExpire newExpire() {
+ return newExpireTable().store().newPartitionExpire("");
+ }
+
+ private FileStoreTable newExpireTable() {
Map<String, String> options = new HashMap<>();
options.put(PARTITION_EXPIRATION_TIME.key(), "2 d");
options.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "1
d");
options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
"yyyyMMdd");
- return table.copy(options).store().newPartitionExpire("");
+ return table.copy(options);
}
}