[ 
https://issues.apache.org/jira/browse/IGNITE-28848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Vinogradov updated IGNITE-28848:
--------------------------------------
    Description: 
{\{ZookeeperDiscoveryImpl.marshalZip()}} sits on the two busiest control-plane 
paths of ZooKeeper discovery:
* the coordinator re-marshals and rewrites the whole \{{ZkDiscoveryEventsData}} 
to ZooKeeper on *every* discovery event — node joins/failures and every custom 
discovery message (cache create/destroy, exchange, services, snapshots, ...);
* join data of each joining node and the coordinator's data-for-joined are 
marshalled on every join — megabytes in clusters with hundreds of caches (large 
enough to be split across znodes by \{{jute.maxbuffer}}).

The current implementation materializes the whole uncompressed marshalled form 
and only then compresses it, allocating ~6x the uncompressed size in garbage 
per call and holding ~3x of it at peak. For an N-MB payload that is ~6N MB of 
garbage and a ~3N MB heap spike *per call* — and a mass join, a client 
reconnect storm or a rolling restart fires this repeatedly, so GC pressure and 
heap spikes hit the coordinator exactly when it is busiest, and the resulting 
GC pauses land on the discovery critical path (events are processed 
sequentially by a single worker, so the whole cluster sees slower event 
propagation). On top of that \{{Deflater.end()}} is never called, so zlib 
native memory is reclaimed only by the Cleaner after a GC.

This change makes the compression path streaming: allocations drop by 73–82% on 
realistic payloads, peak memory is bounded by the compressed output instead of 
~3x the uncompressed size, and zlib native memory is released 
deterministically. Per-call time stays on par or slightly better — the value is 
coordinator stability under mass joins / topology churn, not per-call speed.

h3. Current implementation

{code:java}
return zip(U.marshal(marsh, obj));
{code}

For an uncompressed size of N bytes a single call allocates:
* in \{{U.marshal}} — the marshalling buffer grown to >= N plus a trim copy of 
N (\{{GridByteArrayOutputStream.toByteArray()}});
* in \{{zip()}} — a deflate buffer of N (\{{new byte[bytes.length]}}) and an 
output \{{GridByteArrayOutputStream}} pre-sized to N, plus the final trim copy.

Marshalled bytes, deflate buffer and output buffer are alive simultaneously — 
hence the ~3x peak.

h3. Change

Stream the marshaller output straight through the compressor, so the 
uncompressed form is never materialized:

{code:java}
GridByteArrayOutputStream out = new GridByteArrayOutputStream();

try (BufferedOutputStream zipOut = new BufferedOutputStream(new 
DeflaterOutputStream(out))) {
    U.marshal(marsh, obj, zipOut);
}

return out.toByteArray();
{code}

* The \{{BufferedOutputStream}} is essential: \{{ObjectOutputStream}} writes in 
~1 KB blocks, and an unbuffered \{{DeflaterOutputStream}} pays a JNI deflate 
call per block. The unbuffered variant was measured and rejected: +14–20% time 
vs the current code (25.5 vs 21.3 us/op on a 1.6 KB payload, 20,114 vs 17,661 
us/op on 1.66 MB). The default 8 KB buffer coalesces the blocks and brings the 
time on par with or below the current code.
* \{{DeflaterOutputStream.close()}} ends the \{{Deflater}} it owns, so zlib 
native memory is released deterministically instead of waiting for the Cleaner.
* The private \{{zip()}} helper is removed (\{{marshalZip}} was its only 
caller); \{{unzip()}} stays — it is still used for 
\{{ATTR_SECURITY_SUBJECT_V2}}. The read side (\{{unmarshalZip}}) already 
streams through \{{InflaterInputStream}} and is not changed.
* Same pattern as \{{DiscoveryMessageParser.marshalZip}} in the same package.

h3. Compatibility

The produced bytes are a regular zlib stream, same as before (default 
\{{Deflater}} settings in both versions): zlib format unchanged, old nodes 
inflate it as is — rolling upgrade safe. The benchmark setup asserts 
\{{inflate(new) == inflate(old) == marshal(obj)}}.

h3. Benchmark

JMH, avgt, JDK 17 (corretto), Apple Silicon, fork 1, 3 warmup + 5 measurement 
iterations, \{{-prof gc}}; old = current code, new = this change:

||payload (marshalled -> zipped)||time old, us/op||time new, us/op||alloc old, 
B/op||alloc new, B/op||
|MAP_2K (1.6 KB -> 0.7 KB)|21.3 ± 11.5|15.4 ± 0.7|12,544|14,368 (+15%)|
|MAP_100K (132 KB -> 40 KB)|1,987 ± 512|1,931 ± 264|1,065,204|283,835 (−73%)|
|LIST_1M (1.66 MB -> 318 KB)|17,661 ± 3,433|16,825 ± 980|9,891,984|1,788,235 
(−82%)|

The MAP_2K row is the trade-off, spelled out:
* time is better even there — the new path skips the trim copy of the 
marshalled form and the input-sized deflate buffer;
* allocations grow by a *bounded* ~1.8 KB/call: the fixed stream overhead (8 KB 
\{{BufferedOutputStream}} + 512 B \{{DeflaterOutputStream}} buffer) replaces 
buffers that used to scale with the payload. Rough break-even is ~3 KB of 
marshalled data; above it the new code wins and the win grows with size;
* small payloads pass through \{{marshalZip}} rarely and outside any loop 
(security credentials on node join, the \{{ZkJoiningNodeData}} marker when join 
data is split, communication-error resolve state), while the frequent call site 
— the coordinator rewriting \{{ZkDiscoveryEventsData}} on every discovery event 
— operates on tens to hundreds of KB in clusters where this path matters at all.

  was:
h3. Problem

{\{ZookeeperDiscoveryImpl.marshalZip()}} materializes the whole uncompressed 
marshalled form before compressing it:

{code:java}
return zip(U.marshal(marsh, obj));
{code}

For an uncompressed size of N bytes a single call allocates:
* in \{{U.marshal}} — the marshalling buffer grown to >= N plus a trim copy of 
N (\{{GridByteArrayOutputStream.toByteArray()}});
* in \{{zip()}} — a deflate buffer of N (\{{new byte[bytes.length]}}) and an 
output \{{GridByteArrayOutputStream}} pre-sized to N, plus the final trim copy.

That is roughly 6x the uncompressed size in garbage and ~3x in peak live memory 
(marshalled bytes, deflate buffer and output buffer are alive simultaneously). 
\{{Deflater.end()}} is never called, so zlib native memory is released only by 
the Cleaner after a GC.

{\{marshalZip()}} runs on the discovery control plane:
* joining node data and the coordinator's data-for-joined — megabytes in 
clusters with hundreds of caches (large enough to be split across znodes by 
\{{jute.maxbuffer}});
* the coordinator re-marshals \{{ZkDiscoveryEventsData}} on every discovery 
event;
* security credentials / security subject.

h3. Change

Stream the marshaller output straight through the compressor, so the 
uncompressed form is never materialized:

{code:java}
GridByteArrayOutputStream out = new GridByteArrayOutputStream();

try (BufferedOutputStream zipOut = new BufferedOutputStream(new 
DeflaterOutputStream(out))) {
    U.marshal(marsh, obj, zipOut);
}

return out.toByteArray();
{code}

* The \{{BufferedOutputStream}} is essential: \{{ObjectOutputStream}} writes in 
~1 KB blocks, and an unbuffered \{{DeflaterOutputStream}} pays a JNI deflate 
call per block. The unbuffered variant was measured and rejected: +14-20% time 
vs the current code (25.5 vs 21.3 us/op on a 1.6 KB payload, 20,114 vs 17,661 
us/op on 1.66 MB). The default 8 KB buffer coalesces the blocks and brings the 
time on par with or below the current code.
* \{{DeflaterOutputStream.close()}} ends the \{{Deflater}} it owns, so zlib 
native memory is released deterministically instead of waiting for the Cleaner.
* The private \{{zip()}} helper is removed (\{{marshalZip}} was its only 
caller); \{{unzip()}} stays — it is still used for 
\{{ATTR_SECURITY_SUBJECT_V2}}. The read side (\{{unmarshalZip}}) already 
streams through \{{InflaterInputStream}} and is not changed.
* Same pattern as \{{DiscoveryMessageParser.marshalZip}} in the same package.

h3. Compatibility

The produced bytes are a regular zlib stream, same as before (default 
\{{Deflater}} settings in both versions): zlib format unchanged, old nodes 
inflate it as is — rolling upgrade safe. The benchmark setup asserts 
\{{inflate(new) == inflate(old) == marshal(obj)}}.

h3. Benchmark

JMH, avgt, JDK 17 (corretto), Apple Silicon, fork 1, 3 warmup + 5 measurement 
iterations, \{{-prof gc}}; old = current code, new = this change:

||payload (marshalled -> zipped)||time old, us/op||time new, us/op||alloc old, 
B/op||alloc new, B/op||
|MAP_2K (1.6 KB -> 0.7 KB)|21.3 ± 11.5|15.4 ± 0.7|12,544|14,368 (+15%)|
|MAP_100K (132 KB -> 40 KB)|1,987 ± 512|1,931 ± 264|1,065,204|283,835 (−73%)|
|LIST_1M (1.66 MB -> 318 KB)|17,661 ± 3,433|16,825 ± 980|9,891,984|1,788,235 
(−82%)|

The MAP_2K row is the trade-off, spelled out:
* time is better even there — the new path skips the trim copy of the 
marshalled form and the input-sized deflate buffer;
* allocations grow by a *bounded* ~1.8 KB/call: the fixed stream overhead (8 KB 
\{{BufferedOutputStream}} + 512 B \{{DeflaterOutputStream}} buffer) replaces 
buffers that used to scale with the payload. Rough break-even is ~3 KB of 
marshalled data; above it the new code wins and the win grows with size;
* small payloads pass through \{{marshalZip}} rarely and outside any loop 
(security credentials on node join, the \{{ZkJoiningNodeData}} marker when join 
data is split, communication-error resolve state), while the frequent call site 
— the coordinator rewriting \{{ZkDiscoveryEventsData}} on every discovery event 
— operates on tens to hundreds of KB in clusters where this path matters at all.

h3. Expected effect

This is a control-plane path, so the point is not per-call latency (time is on 
par or better): it is GC pressure and peak heap on the coordinator during mass 
joins / topology churn, where join data and events data are marshalled 
repeatedly — allocations drop by 73-82% on realistic payloads, peak usage no 
longer holds ~3x the uncompressed size, and zlib native memory is released 
deterministically.


> ZooKeeper discovery: stream marshalZip through DeflaterOutputStream to cut 
> allocations and peak memory
> ------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-28848
>                 URL: https://issues.apache.org/jira/browse/IGNITE-28848
>             Project: Ignite
>          Issue Type: Task
>            Reporter: Anton Vinogradov
>            Assignee: Anton Vinogradov
>            Priority: Major
>
> {\{ZookeeperDiscoveryImpl.marshalZip()}} sits on the two busiest 
> control-plane paths of ZooKeeper discovery:
> * the coordinator re-marshals and rewrites the whole 
> \{{ZkDiscoveryEventsData}} to ZooKeeper on *every* discovery event — node 
> joins/failures and every custom discovery message (cache create/destroy, 
> exchange, services, snapshots, ...);
> * join data of each joining node and the coordinator's data-for-joined are 
> marshalled on every join — megabytes in clusters with hundreds of caches 
> (large enough to be split across znodes by \{{jute.maxbuffer}}).
> The current implementation materializes the whole uncompressed marshalled 
> form and only then compresses it, allocating ~6x the uncompressed size in 
> garbage per call and holding ~3x of it at peak. For an N-MB payload that is 
> ~6N MB of garbage and a ~3N MB heap spike *per call* — and a mass join, a 
> client reconnect storm or a rolling restart fires this repeatedly, so GC 
> pressure and heap spikes hit the coordinator exactly when it is busiest, and 
> the resulting GC pauses land on the discovery critical path (events are 
> processed sequentially by a single worker, so the whole cluster sees slower 
> event propagation). On top of that \{{Deflater.end()}} is never called, so 
> zlib native memory is reclaimed only by the Cleaner after a GC.
> This change makes the compression path streaming: allocations drop by 73–82% 
> on realistic payloads, peak memory is bounded by the compressed output 
> instead of ~3x the uncompressed size, and zlib native memory is released 
> deterministically. Per-call time stays on par or slightly better — the value 
> is coordinator stability under mass joins / topology churn, not per-call 
> speed.
> h3. Current implementation
> {code:java}
> return zip(U.marshal(marsh, obj));
> {code}
> For an uncompressed size of N bytes a single call allocates:
> * in \{{U.marshal}} — the marshalling buffer grown to >= N plus a trim copy 
> of N (\{{GridByteArrayOutputStream.toByteArray()}});
> * in \{{zip()}} — a deflate buffer of N (\{{new byte[bytes.length]}}) and an 
> output \{{GridByteArrayOutputStream}} pre-sized to N, plus the final trim 
> copy.
> Marshalled bytes, deflate buffer and output buffer are alive simultaneously — 
> hence the ~3x peak.
> h3. Change
> Stream the marshaller output straight through the compressor, so the 
> uncompressed form is never materialized:
> {code:java}
> GridByteArrayOutputStream out = new GridByteArrayOutputStream();
> try (BufferedOutputStream zipOut = new BufferedOutputStream(new 
> DeflaterOutputStream(out))) {
>     U.marshal(marsh, obj, zipOut);
> }
> return out.toByteArray();
> {code}
> * The \{{BufferedOutputStream}} is essential: \{{ObjectOutputStream}} writes 
> in ~1 KB blocks, and an unbuffered \{{DeflaterOutputStream}} pays a JNI 
> deflate call per block. The unbuffered variant was measured and rejected: 
> +14–20% time vs the current code (25.5 vs 21.3 us/op on a 1.6 KB payload, 
> 20,114 vs 17,661 us/op on 1.66 MB). The default 8 KB buffer coalesces the 
> blocks and brings the time on par with or below the current code.
> * \{{DeflaterOutputStream.close()}} ends the \{{Deflater}} it owns, so zlib 
> native memory is released deterministically instead of waiting for the 
> Cleaner.
> * The private \{{zip()}} helper is removed (\{{marshalZip}} was its only 
> caller); \{{unzip()}} stays — it is still used for 
> \{{ATTR_SECURITY_SUBJECT_V2}}. The read side (\{{unmarshalZip}}) already 
> streams through \{{InflaterInputStream}} and is not changed.
> * Same pattern as \{{DiscoveryMessageParser.marshalZip}} in the same package.
> h3. Compatibility
> The produced bytes are a regular zlib stream, same as before (default 
> \{{Deflater}} settings in both versions): zlib format unchanged, old nodes 
> inflate it as is — rolling upgrade safe. The benchmark setup asserts 
> \{{inflate(new) == inflate(old) == marshal(obj)}}.
> h3. Benchmark
> JMH, avgt, JDK 17 (corretto), Apple Silicon, fork 1, 3 warmup + 5 measurement 
> iterations, \{{-prof gc}}; old = current code, new = this change:
> ||payload (marshalled -> zipped)||time old, us/op||time new, us/op||alloc 
> old, B/op||alloc new, B/op||
> |MAP_2K (1.6 KB -> 0.7 KB)|21.3 ± 11.5|15.4 ± 0.7|12,544|14,368 (+15%)|
> |MAP_100K (132 KB -> 40 KB)|1,987 ± 512|1,931 ± 264|1,065,204|283,835 (−73%)|
> |LIST_1M (1.66 MB -> 318 KB)|17,661 ± 3,433|16,825 ± 980|9,891,984|1,788,235 
> (−82%)|
> The MAP_2K row is the trade-off, spelled out:
> * time is better even there — the new path skips the trim copy of the 
> marshalled form and the input-sized deflate buffer;
> * allocations grow by a *bounded* ~1.8 KB/call: the fixed stream overhead (8 
> KB \{{BufferedOutputStream}} + 512 B \{{DeflaterOutputStream}} buffer) 
> replaces buffers that used to scale with the payload. Rough break-even is ~3 
> KB of marshalled data; above it the new code wins and the win grows with size;
> * small payloads pass through \{{marshalZip}} rarely and outside any loop 
> (security credentials on node join, the \{{ZkJoiningNodeData}} marker when 
> join data is split, communication-error resolve state), while the frequent 
> call site — the coordinator rewriting \{{ZkDiscoveryEventsData}} on every 
> discovery event — operates on tens to hundreds of KB in clusters where this 
> path matters at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to