This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f7d9bf19c [core] Fix the conflict exception may exceed akka frame size
(#2213)
f7d9bf19c is described below
commit f7d9bf19ca56b2e05cb0d2be385126b8e971c897
Author: Aitozi <[email protected]>
AuthorDate: Tue Oct 31 14:31:37 2023 +0800
[core] Fix the conflict exception may exceed akka frame size (#2213)
---
.../paimon/operation/FileStoreCommitImpl.java | 106 +++++++++++++++------
1 file changed, 77 insertions(+), 29 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index b137798d3..11982c4b5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -893,13 +893,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
mergedEntries = ManifestEntry.mergeEntries(allEntries);
ManifestEntry.assertNoDelete(mergedEntries);
} catch (Throwable e) {
- LOG.warn("File deletion conflicts detected! Give up committing.",
e);
- throw createConflictException(
- "File deletion conflicts detected! Give up committing.",
- baseCommitUser,
- baseEntries,
- changes,
- e);
+ Pair<RuntimeException, RuntimeException> conflictException =
+ createConflictException(
+ "File deletion conflicts detected! Give up
committing.",
+ baseCommitUser,
+ baseEntries,
+ changes,
+ e,
+ 50);
+ LOG.warn("", conflictException.getLeft());
+ throw conflictException.getRight();
}
// fast exit for file store without keys
@@ -926,26 +929,37 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
ManifestEntry a = entries.get(i);
ManifestEntry b = entries.get(i + 1);
if (keyComparator.compare(a.file().maxKey(),
b.file().minKey()) >= 0) {
- throw createConflictException(
- "LSM conflicts detected! Give up committing.
Conflict files are:\n"
- + a.identifier().toString(pathFactory)
- + "\n"
- + b.identifier().toString(pathFactory),
- baseCommitUser,
- baseEntries,
- changes,
- null);
+ Pair<RuntimeException, RuntimeException> conflictException
=
+ createConflictException(
+ "LSM conflicts detected! Give up
committing. Conflict files are:\n"
+ +
a.identifier().toString(pathFactory)
+ + "\n"
+ +
b.identifier().toString(pathFactory),
+ baseCommitUser,
+ baseEntries,
+ changes,
+ null,
+ 50);
+
+ LOG.warn("", conflictException.getLeft());
+ throw conflictException.getRight();
}
}
}
}
- private RuntimeException createConflictException(
+ /**
+ * Construct detailed conflict exception. The returned exception is formed
of (full exception,
+ * simplified exception), The simplified exception is generated when the
entry length is larger
+ * than the max limit.
+ */
+ private Pair<RuntimeException, RuntimeException> createConflictException(
String message,
String baseCommitUser,
List<ManifestEntry> baseEntries,
List<ManifestEntry> changes,
- Throwable cause) {
+ Throwable cause,
+ int maxEntry) {
String possibleCauses =
String.join(
"\n",
@@ -982,17 +996,51 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
+ changes.stream()
.map(ManifestEntry::toString)
.collect(Collectors.joining("\n"));
- return new RuntimeException(
- message
- + "\n\n"
- + possibleCauses
- + "\n\n"
- + commitUserString
- + "\n\n"
- + baseEntriesString
- + "\n\n"
- + changesString,
- cause);
+
+ RuntimeException fullException =
+ new RuntimeException(
+ message
+ + "\n\n"
+ + possibleCauses
+ + "\n\n"
+ + commitUserString
+ + "\n\n"
+ + baseEntriesString
+ + "\n\n"
+ + changesString,
+ cause);
+
+ RuntimeException simplifiedException;
+ if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
+ baseEntriesString =
+ "Base entries are:\n"
+ + baseEntries.subList(0,
Math.min(baseEntries.size(), maxEntry))
+ .stream()
+ .map(ManifestEntry::toString)
+ .collect(Collectors.joining("\n"));
+ changesString =
+ "Changes are:\n"
+ + changes.subList(0, Math.min(changes.size(),
maxEntry)).stream()
+ .map(ManifestEntry::toString)
+ .collect(Collectors.joining("\n"));
+ simplifiedException =
+ new RuntimeException(
+ message
+ + "\n\n"
+ + possibleCauses
+ + "\n\n"
+ + commitUserString
+ + "\n\n"
+ + baseEntriesString
+ + "\n\n"
+ + changesString
+ + "\n\n"
+ + "The entry list above are not fully
displayed, please refer to taskmanager.log for more information.",
+ cause);
+ return Pair.of(fullException, simplifiedException);
+ } else {
+ return Pair.of(fullException, fullException);
+ }
}
private void cleanUpTmpManifests(