sauliusvl commented on code in PR #28252:
URL: https://github.com/apache/flink/pull/28252#discussion_r3364530930


##########
flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java:
##########
@@ -333,26 +341,52 @@ private int getUTFBytesSize(int c) {
         }
     }
 
+    /**
+     * Computes the new buffer length for a {@link #resize(int)} call.
+     *
+     * <p>Uses {@code long} arithmetic so that doubling does not silently 
overflow once the current
+     * buffer length crosses {@code Integer.MAX_VALUE / 2}. When doubling 
would exceed {@link
+     * #MAX_ARRAY_SIZE}, the buffer jumps directly to the cap rather than 
growing by {@code
+     * minCapacityAdd} bytes at a time — the latter would degrade every 
subsequent resize into a
+     * full copy of a ~1–2 GB buffer.
+     *
+     * @throws IOException if the required size exceeds {@link 
#MAX_ARRAY_SIZE}.
+     */
+    @VisibleForTesting
+    static int computeNewBufferLength(int currentLength, int minCapacityAdd) 
throws IOException {
+        long requiredLen = (long) currentLength + minCapacityAdd;
+        if (requiredLen > MAX_ARRAY_SIZE) {
+            throw new IOException(
+                    "Serialization failed because the record length ("
+                            + requiredLen
+                            + " bytes) would exceed the maximum Java array 
size ("
+                            + MAX_ARRAY_SIZE
+                            + " bytes).");
+        }
+        long doubledLen = (long) currentLength * 2L;
+        if (doubledLen > MAX_ARRAY_SIZE) {
+            return MAX_ARRAY_SIZE;
+        }
+        return (int) Math.max(doubledLen, requiredLen);
+    }
+
     private void resize(int minCapacityAdd) throws IOException {
-        int newLen = Math.max(this.buffer.length * 2, this.buffer.length + 
minCapacityAdd);
+        int newLen = computeNewBufferLength(this.buffer.length, 
minCapacityAdd);
         byte[] nb;
         try {
             nb = new byte[newLen];
-        } catch (NegativeArraySizeException e) {
-            throw new IOException(
-                    "Serialization failed because the record length would 
exceed 2GB (max addressable array size in Java).");
         } catch (OutOfMemoryError e) {
             // this was too large to allocate, try the smaller size (if 
possible)
-            if (newLen > this.buffer.length + minCapacityAdd) {
-                newLen = this.buffer.length + minCapacityAdd;
+            int minLen = this.buffer.length + minCapacityAdd;
+            if (newLen > minLen) {
                 try {
-                    nb = new byte[newLen];
+                    nb = new byte[minLen];
                 } catch (OutOfMemoryError ee) {
                     // still not possible. give an informative exception 
message that reports the
                     // size
                     throw new IOException(

Review Comment:
   it's my understanding that we'd end up here if a potentially multi-gigabyte 
array allocation failed, not because we've accumulated tons of small objects 
and exhausted the heap, so actually there should be enough memory to create a 
new exception object here. Pre-allocating would also drop the stack trace and 
the message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to