This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch perf-tls-jdk-heap-buffers
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit f3facb01993bfc0135479bdfba0692f3ad800552
Author: He-Pin <[email protected]>
AuthorDate: Sun May 31 20:13:25 2026 +0800

    perf(stream): heap transport buffers for the TLS GraphStage
    
    Motivation:
    The JDK SSLEngine works on byte[] internally and makes an extra
    direct<->heap copy on every wrap/unwrap when handed a direct buffer (see
    Netty's SslHandler, SslEngineType.JDK). TlsGraphStage took its three
    transport buffers from the TCP direct BufferPool, so every record paid
    that copy and each connection pinned ~384 KiB of direct memory for
    buffers that normally only hold a single ~16 KiB TLS record.
    
    Modification:
    Allocate the transport buffers on the heap and size them on demand like
    Netty: start at one packet and let growTransportOutBuffer enlarge the
    wrap buffer (capped at MaxApplicationRecordsPerEngineCall packets) only
    when a larger batch is actually produced. Remove the now-unused
    BufferPool/Tcp wiring and the pool-derived applicationBufferSize helper.
    
    Result:
    Per-record heap allocation drops ~5-8% for 4 KiB and 64 KiB payloads and
    stays flat for 256 B (on-demand sizing avoids a fixed-buffer penalty);
    direct memory per connection drops from ~384 KiB to zero. Behaviour is
    unchanged.
    
    Tests:
    - stream-tests/testOnly *TlsGraphStageIsolatedSpec 
*TlsGraphStageEdgeCasesSpec *TlsGraphStageSpec *TlsSpec *TlsEngineSelectionSpec 
-> 246 succeeded, 0 failed
    - bench-jmh TlsBenchmark.warmRoundTrip -prof gc (graphstage), 
gc.alloc.rate.norm: 256B 1657->1672 B/op, 4096B 20559->18980 B/op, 65536B 
301864->286199 B/op
    
    References:
    Refs #2878
---
 .../stream/io/TlsGraphStageIsolatedSpec.scala      | 11 ++++
 .../pekko/stream/impl/io/TlsGraphStage.scala       | 63 +++++++---------------
 2 files changed, 30 insertions(+), 44 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala
index 53390bef9f..72fc09311b 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala
@@ -234,5 +234,16 @@ class TlsGraphStageIsolatedSpec extends 
StreamSpec(TlsSpec.configOverrides) with
 
       outputs.foldLeft(ByteString.empty)(_ ++ _) shouldEqual expected
     }
+
+    "grow the on-demand transport buffers for multi-record payloads without 
losing bytes" in {
+      // Transport buffers start at a single packet and grow on demand (heap, 
to avoid the JDK
+      // SSLEngine's direct<->heap copy). A payload spanning many records 
forces transportOut to grow
+      // past its initial size and then be reused across records; the round 
trip must still deliver
+      // every byte in order.
+      val payloadSize = 200 * 1024
+      val payload = ByteString(Array.tabulate[Byte](payloadSize)(i => (i % 
251).toByte))
+
+      roundTrip(initSslContext("TLSv1.2"), List(payload), timeout = 
30.seconds) shouldEqual payload
+    }
   }
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala
index f700db110b..40e1e0c701 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala
@@ -30,7 +30,6 @@ import scala.util.control.NonFatal
 
 import org.apache.pekko
 import pekko.annotation.InternalApi
-import pekko.io.{ BufferPool, Tcp }
 import pekko.stream._
 import pekko.stream.TLSProtocol._
 import pekko.stream.impl.Stages.DefaultAttributes
@@ -78,12 +77,11 @@ import pekko.util.ByteString
       private var transportInBuffer: ByteBuffer = _
       private var transportUnreadBuffer: ByteBuffer = _
       private var userInBuffer: ByteBuffer = _
-      private var bufferPool: BufferPool = _
-      private var pooledBufferSize = 0
       private var transportPacketSize = 0
       private var applicationRecordSize = 0
       private var maxUserInBufferSize = 0
       private var maxUserOutBufferSize = 0
+      private var maxTransportOutBufferSize = 0
 
       private val userInput = new UserInputSlot(plainIn)
       private val transportInput = new TransportInputSlot(cipherIn)
@@ -104,10 +102,6 @@ import pekko.util.ByteString
 
       override def preStart(): Unit =
         try {
-          val tcp = Tcp(materializer.system)
-          bufferPool = tcp.bufferPool
-          pooledBufferSize = tcp.Settings.DirectBufferSize
-
           engine = createSSLEngine()
           currentSession = engine.getSession
           allocateBuffers(currentSession)
@@ -135,15 +129,20 @@ import pekko.util.ByteString
       private def allocateBuffers(session: SSLSession): Unit = {
         val packetSize = math.max(1, session.getPacketBufferSize)
         val applicationSize = math.max(1, session.getApplicationBufferSize)
-        val applicationBatchSize = applicationBufferSize(applicationSize, 
packetSize)
         transportPacketSize = packetSize
         applicationRecordSize = applicationSize
-        maxUserInBufferSize = applicationBatchSize
-        maxUserOutBufferSize = applicationBatchSize
-
-        transportOutBuffer = acquireTransportBuffer(packetSize)
-        transportInBuffer = acquireTransportBuffer(packetSize)
-        transportUnreadBuffer = acquireTransportBuffer(packetSize)
+        maxUserInBufferSize = applicationSize * 
MaxApplicationRecordsPerEngineCall
+        maxUserOutBufferSize = applicationSize * 
MaxApplicationRecordsPerEngineCall
+        maxTransportOutBufferSize = packetSize * 
MaxApplicationRecordsPerEngineCall
+
+        // The JDK SSLEngine operates on byte[] internally and makes an extra 
direct<->heap copy per
+        // wrap/unwrap when handed a direct buffer (see Netty SslHandler's 
SslEngineType.JDK). Use heap
+        // buffers and, like Netty, size them on demand: start at a single 
record so small-payload or
+        // short-lived connections allocate little, and let 
growTransportOutBuffer enlarge the wrap
+        // buffer when a larger batch is actually produced.
+        transportOutBuffer = ByteBuffer.allocate(packetSize)
+        transportInBuffer = ByteBuffer.allocate(packetSize)
+        transportUnreadBuffer = ByteBuffer.allocate(packetSize)
         userInBuffer = ByteBuffer.allocate(applicationSize)
         userOutBuffer = ByteBuffer.allocate(applicationSize)
 
@@ -152,12 +151,6 @@ import pekko.util.ByteString
         TlsEngineHelpers.emptyReadBuffer(transportUnreadBuffer)
       }
 
-      private def applicationBufferSize(applicationSize: Int, packetSize: 
Int): Int = {
-        val recordsThatFitTransportBuffer = math.max(1, pooledBufferSize / 
packetSize)
-        val recordsPerEngineCall = 
math.min(MaxApplicationRecordsPerEngineCall, recordsThatFitTransportBuffer)
-        applicationSize * recordsPerEngineCall
-      }
-
       private def runPump(): Unit =
         if ((stateBits & (StageClosedFlag | StageCompletionPendingFlag)) == 0 
&&
           (stateBits & InitialPumpEnabledFlag) != 0) {
@@ -403,6 +396,8 @@ import pekko.util.ByteString
         var result = engine.wrap(userInBuffer, transportOutBuffer)
 
         while (canContinueWrapping(result)) {
+          if (transportOutBuffer.remaining < transportPacketSize)
+            growTransportOutBuffer()
           val userPosition = userInBuffer.position()
           val transportPosition = transportOutBuffer.position()
           result = engine.wrap(userInBuffer, transportOutBuffer)
@@ -417,7 +412,8 @@ import pekko.util.ByteString
         result.getStatus == OK &&
         result.getHandshakeStatus == NOT_HANDSHAKING &&
         userInBuffer.hasRemaining &&
-        transportOutBuffer.remaining >= transportPacketSize
+        (transportOutBuffer.remaining >= transportPacketSize ||
+        transportOutBuffer.capacity < maxTransportOutBufferSize)
 
       @tailrec
       private def doUnwrap(ignoreOutput: Boolean): Unit = {
@@ -567,15 +563,13 @@ import pekko.util.ByteString
       }
 
       private def growTransportOutBuffer(): Unit = {
-        val oldBuffer = transportOutBuffer
-        val oldCapacity = oldBuffer.capacity()
+        val oldCapacity = transportOutBuffer.capacity()
         if (oldCapacity > Int.MaxValue / 2)
           throw new IllegalStateException(s"Cannot grow TLS transport output 
buffer beyond $oldCapacity bytes")
 
-        val bigger = acquireTransportBuffer(oldCapacity * 2)
+        val bigger = ByteBuffer.allocate(oldCapacity * 2)
         transportOutBuffer.flip()
         bigger.put(transportOutBuffer)
-        releaseTransportBuffer(oldBuffer)
         transportOutBuffer = bigger
       }
 
@@ -660,32 +654,13 @@ import pekko.util.ByteString
       }
 
       private def releaseBuffers(): Unit = {
-        releaseTransportBuffer(transportOutBuffer)
         transportOutBuffer = null
         userOutBuffer = null
-        releaseTransportBuffer(transportInBuffer)
         transportInBuffer = null
-        releaseTransportBuffer(transportUnreadBuffer)
         transportUnreadBuffer = null
         userInBuffer = null
       }
 
-      private def acquireTransportBuffer(requiredCapacity: Int): ByteBuffer = {
-        val buffer = bufferPool.acquire()
-        if (buffer.capacity >= requiredCapacity) buffer
-        else {
-          releaseTransportBuffer(buffer)
-          throw new IllegalStateException(
-            s"TLS packet buffer requires $requiredCapacity bytes but 
pekko.io.tcp.direct-buffer-size is $pooledBufferSize")
-        }
-      }
-
-      private def releaseTransportBuffer(buffer: ByteBuffer): Unit =
-        if (buffer ne null) {
-          buffer.clear()
-          bufferPool.release(buffer)
-        }
-
       private final class UserInputSlot(inlet: Inlet[SslTlsOutbound]) extends 
InHandler {
         private val pendingQueue = new Array[AnyRef](UserInputQueueSize)
         private var pendingHead = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to