This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f7d9bf19c [core] Fix the conflict exception may exceed akka frame size 
(#2213)
f7d9bf19c is described below

commit f7d9bf19ca56b2e05cb0d2be385126b8e971c897
Author: Aitozi <[email protected]>
AuthorDate: Tue Oct 31 14:31:37 2023 +0800

    [core] Fix the conflict exception may exceed akka frame size (#2213)
---
 .../paimon/operation/FileStoreCommitImpl.java      | 106 +++++++++++++++------
 1 file changed, 77 insertions(+), 29 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index b137798d3..11982c4b5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -893,13 +893,16 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             mergedEntries = ManifestEntry.mergeEntries(allEntries);
             ManifestEntry.assertNoDelete(mergedEntries);
         } catch (Throwable e) {
-            LOG.warn("File deletion conflicts detected! Give up committing.", 
e);
-            throw createConflictException(
-                    "File deletion conflicts detected! Give up committing.",
-                    baseCommitUser,
-                    baseEntries,
-                    changes,
-                    e);
+            Pair<RuntimeException, RuntimeException> conflictException =
+                    createConflictException(
+                            "File deletion conflicts detected! Give up 
committing.",
+                            baseCommitUser,
+                            baseEntries,
+                            changes,
+                            e,
+                            50);
+            LOG.warn("", conflictException.getLeft());
+            throw conflictException.getRight();
         }
 
         // fast exit for file store without keys
@@ -926,26 +929,37 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 ManifestEntry a = entries.get(i);
                 ManifestEntry b = entries.get(i + 1);
                 if (keyComparator.compare(a.file().maxKey(), 
b.file().minKey()) >= 0) {
-                    throw createConflictException(
-                            "LSM conflicts detected! Give up committing. 
Conflict files are:\n"
-                                    + a.identifier().toString(pathFactory)
-                                    + "\n"
-                                    + b.identifier().toString(pathFactory),
-                            baseCommitUser,
-                            baseEntries,
-                            changes,
-                            null);
+                    Pair<RuntimeException, RuntimeException> conflictException 
=
+                            createConflictException(
+                                    "LSM conflicts detected! Give up 
committing. Conflict files are:\n"
+                                            + 
a.identifier().toString(pathFactory)
+                                            + "\n"
+                                            + 
b.identifier().toString(pathFactory),
+                                    baseCommitUser,
+                                    baseEntries,
+                                    changes,
+                                    null,
+                                    50);
+
+                    LOG.warn("", conflictException.getLeft());
+                    throw conflictException.getRight();
                 }
             }
         }
     }
 
-    private RuntimeException createConflictException(
+    /**
+     * Construct detailed conflict exception. The returned exception is formed 
of (full exception,
+     * simplified exception), The simplified exception is generated when the 
entry length is larger
+     * than the max limit.
+     */
+    private Pair<RuntimeException, RuntimeException> createConflictException(
             String message,
             String baseCommitUser,
             List<ManifestEntry> baseEntries,
             List<ManifestEntry> changes,
-            Throwable cause) {
+            Throwable cause,
+            int maxEntry) {
         String possibleCauses =
                 String.join(
                         "\n",
@@ -982,17 +996,51 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         + changes.stream()
                                 .map(ManifestEntry::toString)
                                 .collect(Collectors.joining("\n"));
-        return new RuntimeException(
-                message
-                        + "\n\n"
-                        + possibleCauses
-                        + "\n\n"
-                        + commitUserString
-                        + "\n\n"
-                        + baseEntriesString
-                        + "\n\n"
-                        + changesString,
-                cause);
+
+        RuntimeException fullException =
+                new RuntimeException(
+                        message
+                                + "\n\n"
+                                + possibleCauses
+                                + "\n\n"
+                                + commitUserString
+                                + "\n\n"
+                                + baseEntriesString
+                                + "\n\n"
+                                + changesString,
+                        cause);
+
+        RuntimeException simplifiedException;
+        if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
+            baseEntriesString =
+                    "Base entries are:\n"
+                            + baseEntries.subList(0, 
Math.min(baseEntries.size(), maxEntry))
+                                    .stream()
+                                    .map(ManifestEntry::toString)
+                                    .collect(Collectors.joining("\n"));
+            changesString =
+                    "Changes are:\n"
+                            + changes.subList(0, Math.min(changes.size(), 
maxEntry)).stream()
+                                    .map(ManifestEntry::toString)
+                                    .collect(Collectors.joining("\n"));
+            simplifiedException =
+                    new RuntimeException(
+                            message
+                                    + "\n\n"
+                                    + possibleCauses
+                                    + "\n\n"
+                                    + commitUserString
+                                    + "\n\n"
+                                    + baseEntriesString
+                                    + "\n\n"
+                                    + changesString
+                                    + "\n\n"
+                                    + "The entry list above are not fully 
displayed, please refer to taskmanager.log for more information.",
+                            cause);
+            return Pair.of(fullException, simplifiedException);
+        } else {
+            return Pair.of(fullException, fullException);
+        }
     }
 
     private void cleanUpTmpManifests(

Reply via email to