This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6cacf03c18 [hotfix] Refactor code in FileStoreCommitImpl
6cacf03c18 is described below
commit 6cacf03c18148fc82b3f96d5d87b3945c9612bdc
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jun 9 12:07:42 2025 +0800
[hotfix] Refactor code in FileStoreCommitImpl
---
.../paimon/operation/FileStoreCommitImpl.java | 73 ++++++++++++----------
.../org/apache/paimon/schema/SchemaManager.java | 4 ++
2 files changed, 43 insertions(+), 34 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index f85bf6b2c7..852776d2fe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -713,22 +713,22 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
if (!commitMessages.isEmpty()) {
List<String> msg = new ArrayList<>();
- if (appendTableFiles.size() > 0) {
+ if (!appendTableFiles.isEmpty()) {
msg.add(appendTableFiles.size() + " append table files");
}
- if (appendChangelog.size() > 0) {
+ if (!appendChangelog.isEmpty()) {
msg.add(appendChangelog.size() + " append Changelogs");
}
- if (compactTableFiles.size() > 0) {
+ if (!compactTableFiles.isEmpty()) {
msg.add(compactTableFiles.size() + " compact table files");
}
- if (compactChangelog.size() > 0) {
+ if (!compactChangelog.isEmpty()) {
msg.add(compactChangelog.size() + " compact Changelogs");
}
- if (appendHashIndexFiles.size() > 0) {
+ if (!appendHashIndexFiles.isEmpty()) {
msg.add(appendHashIndexFiles.size() + " append hash index
files");
}
- if (compactDvIndexFiles.size() > 0) {
+ if (!compactDvIndexFiles.isEmpty()) {
msg.add(compactDvIndexFiles.size() + " compact dv index
files");
}
LOG.info("Finished collecting changes, including: {}",
String.join(", ", msg));
@@ -989,7 +989,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
indexManifestFile.writeIndexFiles(oldIndexManifest,
indexFiles, bucketMode);
}
- long latestSchemaId = schemaManager.latest().get().id();
+ long latestSchemaId =
+ schemaManager
+ .latestOrThrow("Cannot get latest schema for table
" + tableName)
+ .id();
// write new stats or inherit from the previous snapshot
String statsFileName = null;
@@ -1261,8 +1264,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
baseCommitUser,
baseEntries,
changes,
- e,
- 50);
+ e);
LOG.warn("", conflictException.getLeft());
throw conflictException.getRight();
};
@@ -1310,8 +1312,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
baseCommitUser,
baseEntries,
changes,
- null,
- 50);
+ null);
LOG.warn("", conflictException.getLeft());
throw conflictException.getRight();
@@ -1332,29 +1333,33 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
tableName);
}
} catch (Throwable e) {
- if (partitionExpire != null &&
partitionExpire.isValueExpiration()) {
- Set<BinaryRow> deletedPartitions = new HashSet<>();
- for (SimpleFileEntry entry : mergedEntries) {
- if (entry.kind() == FileKind.DELETE) {
- deletedPartitions.add(entry.partition());
- }
- }
- if (partitionExpire.isValueAllExpired(deletedPartitions)) {
- List<String> expiredPartitions =
- deletedPartitions.stream()
- .map(
- partition ->
- partToSimpleString(
- partitionType,
partition, "-", 200))
- .collect(Collectors.toList());
- throw new RuntimeException(
- "You are writing data to expired partitions, and
you can filter this data to avoid job failover."
- + " Otherwise, continuous expired records
will cause the job to failover restart continuously."
- + " Expired partitions are: "
- + expiredPartitions);
+ assertConflictForPartitionExpire(mergedEntries);
+ conflictHandler.accept(e);
+ }
+ }
+
+ private void assertConflictForPartitionExpire(Collection<SimpleFileEntry>
mergedEntries) {
+ if (partitionExpire != null && partitionExpire.isValueExpiration()) {
+ Set<BinaryRow> deletedPartitions = new HashSet<>();
+ for (SimpleFileEntry entry : mergedEntries) {
+ if (entry.kind() == FileKind.DELETE) {
+ deletedPartitions.add(entry.partition());
}
}
- conflictHandler.accept(e);
+ if (partitionExpire.isValueAllExpired(deletedPartitions)) {
+ List<String> expiredPartitions =
+ deletedPartitions.stream()
+ .map(
+ partition ->
+ partToSimpleString(
+ partitionType,
partition, "-", 200))
+ .collect(Collectors.toList());
+ throw new RuntimeException(
+ "You are writing data to expired partitions, and you
can filter this data to avoid job failover."
+ + " Otherwise, continuous expired records will
cause the job to failover restart continuously."
+ + " Expired partitions are: "
+ + expiredPartitions);
+ }
}
}
@@ -1368,8 +1373,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
String baseCommitUser,
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> changes,
- Throwable cause,
- int maxEntry) {
+ Throwable cause) {
String possibleCauses =
String.join(
"\n",
@@ -1414,6 +1418,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
cause);
RuntimeException simplifiedException;
+ int maxEntry = 50;
if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
baseEntriesString =
"Base entries are:\n"
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 21484cfc84..36a18e26a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -119,6 +119,10 @@ public class SchemaManager implements Serializable {
}
}
+ public TableSchema latestOrThrow(String message) {
+ return latest().orElseThrow(() -> new RuntimeException(message));
+ }
+
public long earliestCreationTime() {
try {
long earliest = 0;