[ 
https://issues.apache.org/jira/browse/IGNITE-28848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18093442#comment-18093442
 ] 

Ignite TC Bot commented on IGNITE-28848:
----------------------------------------

{panel:title=Branch: [pull/13319/head] Base: [master] : No blockers 
found!|borderStyle=dashed|borderColor=#ccc|titleBGColor=#D6F7C1}{panel}
{panel:title=Branch: [pull/13319/head] Base: [master] : No new tests 
found!|borderStyle=dashed|borderColor=#ccc|titleBGColor=#F7D6C1}{panel}
[TeamCity *--> Run :: All* 
Results|https://ci2.ignite.apache.org/viewLog.html?buildId=9169643&buildTypeId=IgniteTests24Java8_RunAll]
{color:#ffffff}tcbot-analysis-comment chainBuildId=9169643 
rerunBuildIds=none{color}

> ZooKeeper discovery: stream marshalZip through DeflaterOutputStream to cut 
> allocations and peak memory
> ------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-28848
>                 URL: https://issues.apache.org/jira/browse/IGNITE-28848
>             Project: Ignite
>          Issue Type: Task
>            Reporter: Anton Vinogradov
>            Assignee: Anton Vinogradov
>            Priority: Major
>
> {\{ZookeeperDiscoveryImpl.marshalZip()}} sits on the two busiest 
> control-plane paths of ZooKeeper discovery:
> * the coordinator re-marshals and rewrites the whole 
> \{{ZkDiscoveryEventsData}} to ZooKeeper on *every* discovery event — node 
> joins/failures and every custom discovery message (cache create/destroy, 
> exchange, services, snapshots, ...);
> * join data of each joining node and the coordinator's data-for-joined are 
> marshalled on every join — megabytes in clusters with hundreds of caches 
> (large enough to be split across znodes by \{{jute.maxbuffer}}).
> The current implementation materializes the whole uncompressed marshalled 
> form and only then compresses it, allocating ~6x the uncompressed size in 
> garbage per call and holding ~3x of it at peak. For an N-MB payload that is 
> ~6N MB of garbage and a ~3N MB heap spike *per call* — and a mass join, a 
> client reconnect storm or a rolling restart fires this repeatedly, so GC 
> pressure and heap spikes hit the coordinator exactly when it is busiest, and 
> the resulting GC pauses land on the discovery critical path (events are 
> processed sequentially by a single worker, so the whole cluster sees slower 
> event propagation). On top of that \{{Deflater.end()}} is never called, so 
> zlib native memory is reclaimed only by the Cleaner after a GC.
> This change makes the compression path streaming: allocations drop by 73–82% 
> on realistic payloads, peak memory is bounded by the compressed output 
> instead of ~3x the uncompressed size, and zlib native memory is released 
> deterministically. Per-call time stays on par or slightly better — the value 
> is coordinator stability under mass joins / topology churn, not per-call 
> speed.
> h3. Current implementation
> {code:java}
> return zip(U.marshal(marsh, obj));
> {code}
> For an uncompressed size of N bytes a single call allocates:
> * in \{{U.marshal}} — the marshalling buffer grown to >= N plus a trim copy 
> of N (\{{GridByteArrayOutputStream.toByteArray()}});
> * in \{{zip()}} — a deflate buffer of N (\{{new byte[bytes.length]}}) and an 
> output \{{GridByteArrayOutputStream}} pre-sized to N, plus the final trim 
> copy.
> Marshalled bytes, deflate buffer and output buffer are alive simultaneously — 
> hence the ~3x peak.
> h3. Change
> Stream the marshaller output straight through the compressor, so the 
> uncompressed form is never materialized:
> {code:java}
> GridByteArrayOutputStream out = new GridByteArrayOutputStream();
> try (BufferedOutputStream zipOut = new BufferedOutputStream(new 
> DeflaterOutputStream(out))) {
>     U.marshal(marsh, obj, zipOut);
> }
> return out.toByteArray();
> {code}
> * The \{{BufferedOutputStream}} is essential: \{{ObjectOutputStream}} writes 
> in ~1 KB blocks, and an unbuffered \{{DeflaterOutputStream}} pays a JNI 
> deflate call per block. The unbuffered variant was measured and rejected: 
> +14–20% time vs the current code (25.5 vs 21.3 us/op on a 1.6 KB payload, 
> 20,114 vs 17,661 us/op on 1.66 MB). The default 8 KB buffer coalesces the 
> blocks and brings the time on par with or below the current code.
> * \{{DeflaterOutputStream.close()}} ends the \{{Deflater}} it owns, so zlib 
> native memory is released deterministically instead of waiting for the 
> Cleaner.
> * The private \{{zip()}} helper is removed (\{{marshalZip}} was its only 
> caller); \{{unzip()}} stays — it is still used for 
> \{{ATTR_SECURITY_SUBJECT_V2}}. The read side (\{{unmarshalZip}}) already 
> streams through \{{InflaterInputStream}} and is not changed.
> * Same pattern as \{{DiscoveryMessageParser.marshalZip}} in the same package.
> h3. Compatibility
> The produced bytes are a regular zlib stream, same as before (default 
> \{{Deflater}} settings in both versions): zlib format unchanged, old nodes 
> inflate it as is — rolling upgrade safe. The benchmark setup asserts 
> \{{inflate(new) == inflate(old) == marshal(obj)}}.
> h3. Benchmark
> JMH, avgt, JDK 17 (corretto), Apple Silicon, fork 1, 3 warmup + 5 measurement 
> iterations, \{{-prof gc}}; old = current code, new = this change:
> ||payload (marshalled -> zipped)||time old, us/op||time new, us/op||alloc 
> old, B/op||alloc new, B/op||
> |MAP_2K (1.6 KB -> 0.7 KB)|21.3 ± 11.5|15.4 ± 0.7|12,544|14,368 (+15%)|
> |MAP_100K (132 KB -> 40 KB)|1,987 ± 512|1,931 ± 264|1,065,204|283,835 (−73%)|
> |LIST_1M (1.66 MB -> 318 KB)|17,661 ± 3,433|16,825 ± 980|9,891,984|1,788,235 
> (−82%)|
> The MAP_2K row is the trade-off, spelled out:
> * time is better even there — the new path skips the trim copy of the 
> marshalled form and the input-sized deflate buffer;
> * allocations grow by a *bounded* ~1.8 KB/call: the fixed stream overhead (8 
> KB \{{BufferedOutputStream}} + 512 B \{{DeflaterOutputStream}} buffer) 
> replaces buffers that used to scale with the payload. Rough break-even is ~3 
> KB of marshalled data; above it the new code wins and the win grows with size;
> * small payloads pass through \{{marshalZip}} rarely and outside any loop 
> (security credentials on node join, the \{{ZkJoiningNodeData}} marker when 
> join data is split, communication-error resolve state), while the frequent 
> call site — the coordinator rewriting \{{ZkDiscoveryEventsData}} on every 
> discovery event — operates on tens to hundreds of KB in clusters where this 
> path matters at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to