[ 
https://issues.apache.org/jira/browse/IGNITE-28836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Vinogradov updated IGNITE-28836:
--------------------------------------
    Description: 
*Motivation*

DirectMessageWriter is on the critical path of every outgoing message. Two 
inefficiencies:
# Each of the ~30 write methods re-resolves \{{state.item().stream}} (array 
load + bounds check + field load) on every primitive write.
# \{{writeCompressedMessage()}} allocates a fresh 10K 
\{{ByteBuffer.allocateDirect()}} per compressed field (plus a doubling 
re-allocation chain for payloads above 10K) and a brand-new 
\{{DirectMessageWriter}} per field.

*Fix*

# Cache the current stream in \{{curStream}}, refreshed only when the current 
state item changes (\{{setBuffer}} / \{{beforeNestedWrite}} / 
\{{afterNestedWrite}}).
# \{{CompressedMessage}} consumes the scratch buffer right in its constructor 
(deflates into its own byte[]), so the buffer never escapes 
\{{writeCompressedMessage()}}: the writer now keeps one reusable heap scratch 
buffer (retained at the largest size seen) and a thread-confined reusable 
\{{tmpWriter}}, and grows the scratch buffer without an intermediate byte[] 
copy.

No wire-format or public API changes; behavior-preserving, safe to backport.

*Benchmark* (\{{JmhDirectMessageWriterBenchmark}}, added by the patch: an 
exchange-style message with two compressed map fields sized below/above the 
initial 10K scratch, and a ~35-field primitive message; JDK 17, -prof gc, 
master vs patched)

||benchmark||master||patched||delta||
|compressedMapFields(30), throughput|17 550 ops/s|20 037 ops/s|*+14%*|
|compressedMapFields(30), allocations|26.7 KB/op heap + 20 KB/op direct|24.3 
KB/op heap, zero direct|-9% heap, no Cleaner churn|
|compressedMapFields(30), GC time|192 ms|9 ms|*x21 less*|
|compressedMapFields(500), throughput|2 382 ops/s|2 475 ops/s|+4% (within 
error)|
|compressedMapFields(500), allocations|365 KB/op heap + ~300 KB/op direct|322 
KB/op heap, zero direct|-12% heap, no Cleaner churn|
|compressedMapFields(500), GC time|56 ms|11 ms|*x5 less*|
|primitiveFields, throughput|34.9M ops/s|35.3M ops/s|within error|

Master's direct-buffer churn is invisible to gc.alloc.rate.norm but shows up as 
GC time: Cleaner processing makes collections an order of magnitude more 
expensive at the same collection counts.

*Testing*
* DirectMarshallingMessagesTest — nested containers written through 16-byte 
chunks (multi-pass resume across buffer swaps, exercises the curStream refresh 
points).
* CompressedMessageTest — >40K compressed payload (exercises the scratch growth 
path), byte-for-byte writer-to-reader round-trip.

  was:
h3. Motivation

org.apache.ignite.internal.direct.DirectMessageWriter is on the critical path 
of every outgoing message: generated serializers call it field by field, and 
the NIO write loop re-enters it for every network buffer. Two inefficiencies 
show up there:

# Per-field stream resolution. Each of the ~32 writeXxx methods starts with 
\{{DirectByteBufferStream stream = state.item().stream;}}, i.e. stack[pos] 
(array load + bounds check) followed by a field load — re-evaluated on every 
primitive write. The current stream only changes on setBuffer / 
beforeNestedWrite / afterNestedWrite.
# Per-field allocations in the compressed path. writeCompressedMessage() 
allocates, for every compressed field, a fresh ByteBuffer.allocateDirect(10KB) 
plus a brand-new DirectMessageWriter (its own state stack + stream). The 
scratch buffer is only ever copied into a heap byte[] by 
CompressedMessage.compress() (via buf.get(...)) before deflating, so the direct 
allocation (native alloc + zeroing + Cleaner/GC reclamation) is pure overhead. 
Heavy exchange messages (GridDhtPartitionsSingleMessage / FullMessage) carry 
several compressed maps each, multiplying the cost during PME.

h3. Proposed changes

* Cache the current state item's stream in a curStream field; refresh it only 
in setBuffer, beforeNestedWrite, afterNestedWrite. All writeXxx methods use 
curStream instead of re-resolving state.item().stream.
* In writeCompressedMessage():
** use ByteBuffer.allocate() (heap) for the scratch buffer instead of 
allocateDirect();
** reuse a lazily-created, thread-confined tmpWriter (reset() before each use) 
instead of allocating a new writer per field — mirroring how the main writer is 
already reused across messages;
** grow the scratch buffer without the intermediate byte[] copy.

No wire-format change, no public API change.

h3. Benchmark (JMH, JDK 17, throughput; A/B baseline vs patched)

|| Benchmark || Baseline || Patched || Delta ||
| hotPathPrimitiveFields (1792 write calls/op) | ~551K ops/s | ~682K ops/s | 
+24% |
| compressed scratch acquire (direct+new -> heap+reuse) | 1.21M ops/s | 3.68M 
ops/s | x3.0 |
| compressed scratch: GC time | 1006 ms | 130 ms | x8 less |

The compressed path trades a little cheap young-gen heap churn for the 
elimination of off-heap / Cleaner direct-buffer churn, cutting total GC time 
~8x.

h3. Testing

* A JMH benchmark JmhDirectMessageWriterBenchmark is added under 
modules/benchmarks.
* Correctness verified by byte-for-byte writer->reader round-trips, identical 
between baseline and patched:
** primitives/arrays/String/UUID, 5000 records, 32-byte write buffer (thousands 
of setBuffer cycles);
** compressed map (4000 entries -> exercises scratch-buffer doubling; 16-byte 
chunks; second marshal reusing the writer -> exercises the tmpWriter.reset() 
branch).
* Existing DirectMarshallingMessagesTest covers the nested / compressed 
serialization paths.

h3. Compatibility / Risk

Behavior-preserving, no protocol change; safe to backport.


> DirectMessageWriter: reduce per-field overhead and per-message allocations on 
> the message serialization hot path
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-28836
>                 URL: https://issues.apache.org/jira/browse/IGNITE-28836
>             Project: Ignite
>          Issue Type: Task
>            Reporter: Anton Vinogradov
>            Assignee: Anton Vinogradov
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Motivation*
> DirectMessageWriter is on the critical path of every outgoing message. Two 
> inefficiencies:
> # Each of the ~30 write methods re-resolves \{{state.item().stream}} (array 
> load + bounds check + field load) on every primitive write.
> # \{{writeCompressedMessage()}} allocates a fresh 10K 
> \{{ByteBuffer.allocateDirect()}} per compressed field (plus a doubling 
> re-allocation chain for payloads above 10K) and a brand-new 
> \{{DirectMessageWriter}} per field.
> *Fix*
> # Cache the current stream in \{{curStream}}, refreshed only when the current 
> state item changes (\{{setBuffer}} / \{{beforeNestedWrite}} / 
> \{{afterNestedWrite}}).
> # \{{CompressedMessage}} consumes the scratch buffer right in its constructor 
> (deflates into its own byte[]), so the buffer never escapes 
> \{{writeCompressedMessage()}}: the writer now keeps one reusable heap scratch 
> buffer (retained at the largest size seen) and a thread-confined reusable 
> \{{tmpWriter}}, and grows the scratch buffer without an intermediate byte[] 
> copy.
> No wire-format or public API changes; behavior-preserving, safe to backport.
> *Benchmark* (\{{JmhDirectMessageWriterBenchmark}}, added by the patch: an 
> exchange-style message with two compressed map fields sized below/above the 
> initial 10K scratch, and a ~35-field primitive message; JDK 17, -prof gc, 
> master vs patched)
> ||benchmark||master||patched||delta||
> |compressedMapFields(30), throughput|17 550 ops/s|20 037 ops/s|*+14%*|
> |compressedMapFields(30), allocations|26.7 KB/op heap + 20 KB/op direct|24.3 
> KB/op heap, zero direct|-9% heap, no Cleaner churn|
> |compressedMapFields(30), GC time|192 ms|9 ms|*x21 less*|
> |compressedMapFields(500), throughput|2 382 ops/s|2 475 ops/s|+4% (within 
> error)|
> |compressedMapFields(500), allocations|365 KB/op heap + ~300 KB/op direct|322 
> KB/op heap, zero direct|-12% heap, no Cleaner churn|
> |compressedMapFields(500), GC time|56 ms|11 ms|*x5 less*|
> |primitiveFields, throughput|34.9M ops/s|35.3M ops/s|within error|
> Master's direct-buffer churn is invisible to gc.alloc.rate.norm but shows up 
> as GC time: Cleaner processing makes collections an order of magnitude more 
> expensive at the same collection counts.
> *Testing*
> * DirectMarshallingMessagesTest — nested containers written through 16-byte 
> chunks (multi-pass resume across buffer swaps, exercises the curStream 
> refresh points).
> * CompressedMessageTest — >40K compressed payload (exercises the scratch 
> growth path), byte-for-byte writer-to-reader round-trip.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to