Saulius Valatka created FLINK-39754:
---------------------------------------
Summary: DataOutputSerializer.resize() int overflow causes O(n²)
growth near 2 GB
Key: FLINK-39754
URL: https://issues.apache.org/jira/browse/FLINK-39754
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 2.2.1, 2.1.2, 1.20.4
Reporter: Saulius Valatka
h3. Summary
{{DataOutputSerializer.resize(int)}} in {{flink-core}} uses {{int}} arithmetic
to compute the new buffer length:
{code:java}
int newLen = Math.max(this.buffer.length * 2, this.buffer.length +
minCapacityAdd);
{code}
Once {{buffer.length > Integer.MAX_VALUE / 2}} (~1.07 GB), {{buffer.length *
2}} overflows to a negative {{int}}. {{Math.max}} then picks {{buffer.length +
minCapacityAdd}}, which is typically only a few bytes larger than the current
length. From that point on, every {{resize()}} call grows the buffer by
{{minCapacityAdd}} bytes instead of doubling, and each call does a full
{{System.arraycopy}} of the ~1.5+ GB buffer. The result is an O(n^2^) memcpy
loop that on large heaps looks like a silent hang — eventually {{buffer.length
+ minCapacityAdd}} also overflows and the existing {{catch
(NegativeArraySizeException)}} translates it to an {{IOException}}.
h3. File / method
[{{flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java}}|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java],
method {{resize(int minCapacityAdd)}}.
The current shape of {{resize()}} has been in place for many years and is
present in every released Flink version.
h3. Reproduction
Allocate a {{DataOutputSerializer(1024)}} and call {{writeInt(0)}} in a loop
with {{-Xmx3g}}. Once {{position}} crosses ~1.07 GB, throughput drops to near
zero; each subsequent {{writeInt}} triggers a full-buffer {{arraycopy}}. JFR /
async-profiler shows time concentrated in {{System.arraycopy}} inside
{{resize()}}.
Encountered in production while checkpointing an {{IcebergEnumeratorState}}
containing ~1.87M {{IcebergSourceSplit}} entries —
{{IcebergEnumeratorStateSerializer.serializePendingSplits()}} serializes every
split into a single {{DataOutputSerializer}} buffer, which crosses the 1 GB
threshold and hangs.
h3. Expected behavior
Either grow cleanly up to the JVM max array size ({{Integer.MAX_VALUE - 8}})
and then throw an actionable {{IOException}}, or throw immediately when the
requested size would exceed the cap. No silent O(n^2^) hang.
h3. Proposed fix
Extract the size computation into a {{@VisibleForTesting}} static helper that
uses {{long}} arithmetic, validates against {{Integer.MAX_VALUE - 8}}, and
jumps to the cap once doubling would overflow (so serializations that just
barely fit under 2 GB still complete). Drop the now-unreachable {{catch
(NegativeArraySizeException)}} block. Preserve the existing
{{OutOfMemoryError}} retry path (independent of this bug). Add unit tests on
the new helper covering: normal doubling, {{minCapacityAdd}}-dominated growth,
jump-to-cap on doubling overflow, exact-cap boundary, and {{IOException}} when
required size exceeds the cap.
h3. Impact
Any Flink job whose state serialization assembles a single record / object > ~1
GB in a {{DataOutputSerializer}} is affected. Most commonly hit by source
enumerator state with many splits (Iceberg, file source) and by very large
keyed state values.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)