This is an automated email from the ASF dual-hosted git repository. He-Pin pushed a commit to branch perf-tls-jdk-heap-buffers in repository https://gitbox.apache.org/repos/asf/pekko.git
commit f3facb01993bfc0135479bdfba0692f3ad800552 Author: He-Pin <[email protected]> AuthorDate: Sun May 31 20:13:25 2026 +0800 perf(stream): heap transport buffers for the TLS GraphStage Motivation: The JDK SSLEngine works on byte[] internally and makes an extra direct<->heap copy on every wrap/unwrap when handed a direct buffer (see Netty's SslHandler, SslEngineType.JDK). TlsGraphStage took its three transport buffers from the TCP direct BufferPool, so every record paid that copy and each connection pinned ~384 KiB of direct memory for buffers that normally only hold a single ~16 KiB TLS record. Modification: Allocate the transport buffers on the heap and size them on demand like Netty: start at one packet and let growTransportOutBuffer enlarge the wrap buffer (capped at MaxApplicationRecordsPerEngineCall packets) only when a larger batch is actually produced. Remove the now-unused BufferPool/Tcp wiring and the pool-derived applicationBufferSize helper. Result: Per-record heap allocation drops ~5-8% for 4 KiB and 64 KiB payloads and stays flat for 256 B (on-demand sizing avoids a fixed-buffer penalty); direct memory per connection drops from ~384 KiB to zero. Behaviour is unchanged. Tests: - stream-tests/testOnly *TlsGraphStageIsolatedSpec *TlsGraphStageEdgeCasesSpec *TlsGraphStageSpec *TlsSpec *TlsEngineSelectionSpec -> 246 succeeded, 0 failed - bench-jmh TlsBenchmark.warmRoundTrip -prof gc (graphstage), gc.alloc.rate.norm: 256B 1657->1672 B/op, 4096B 20559->18980 B/op, 65536B 301864->286199 B/op References: Refs #2878 --- .../stream/io/TlsGraphStageIsolatedSpec.scala | 11 ++++ .../pekko/stream/impl/io/TlsGraphStage.scala | 63 +++++++--------------- 2 files changed, 30 insertions(+), 44 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala index 53390bef9f..72fc09311b 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageIsolatedSpec.scala @@ -234,5 +234,16 @@ class TlsGraphStageIsolatedSpec extends StreamSpec(TlsSpec.configOverrides) with outputs.foldLeft(ByteString.empty)(_ ++ _) shouldEqual expected } + + "grow the on-demand transport buffers for multi-record payloads without losing bytes" in { + // Transport buffers start at a single packet and grow on demand (heap, to avoid the JDK + // SSLEngine's direct<->heap copy). A payload spanning many records forces transportOut to grow + // past its initial size and then be reused across records; the round trip must still deliver + // every byte in order. + val payloadSize = 200 * 1024 + val payload = ByteString(Array.tabulate[Byte](payloadSize)(i => (i % 251).toByte)) + + roundTrip(initSslContext("TLSv1.2"), List(payload), timeout = 30.seconds) shouldEqual payload + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala index f700db110b..40e1e0c701 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TlsGraphStage.scala @@ -30,7 +30,6 @@ import scala.util.control.NonFatal import org.apache.pekko import pekko.annotation.InternalApi -import pekko.io.{ BufferPool, Tcp } import pekko.stream._ import pekko.stream.TLSProtocol._ import pekko.stream.impl.Stages.DefaultAttributes @@ -78,12 +77,11 @@ import pekko.util.ByteString private var transportInBuffer: ByteBuffer = _ private var transportUnreadBuffer: ByteBuffer = _ private var userInBuffer: ByteBuffer = _ - private var bufferPool: BufferPool = _ - private var pooledBufferSize = 0 private var transportPacketSize = 0 private var applicationRecordSize = 0 private var maxUserInBufferSize = 0 private var maxUserOutBufferSize = 0 + private var maxTransportOutBufferSize = 0 private val userInput = new UserInputSlot(plainIn) private val transportInput = new TransportInputSlot(cipherIn) @@ -104,10 +102,6 @@ import pekko.util.ByteString override def preStart(): Unit = try { - val tcp = Tcp(materializer.system) - bufferPool = tcp.bufferPool - pooledBufferSize = tcp.Settings.DirectBufferSize - engine = createSSLEngine() currentSession = engine.getSession allocateBuffers(currentSession) @@ -135,15 +129,20 @@ import pekko.util.ByteString private def allocateBuffers(session: SSLSession): Unit = { val packetSize = math.max(1, session.getPacketBufferSize) val applicationSize = math.max(1, session.getApplicationBufferSize) - val applicationBatchSize = applicationBufferSize(applicationSize, packetSize) transportPacketSize = packetSize applicationRecordSize = applicationSize - maxUserInBufferSize = applicationBatchSize - maxUserOutBufferSize = applicationBatchSize - - transportOutBuffer = acquireTransportBuffer(packetSize) - transportInBuffer = acquireTransportBuffer(packetSize) - transportUnreadBuffer = acquireTransportBuffer(packetSize) + maxUserInBufferSize = applicationSize * MaxApplicationRecordsPerEngineCall + maxUserOutBufferSize = applicationSize * MaxApplicationRecordsPerEngineCall + maxTransportOutBufferSize = packetSize * MaxApplicationRecordsPerEngineCall + + // The JDK SSLEngine operates on byte[] internally and makes an extra direct<->heap copy per + // wrap/unwrap when handed a direct buffer (see Netty SslHandler's SslEngineType.JDK). Use heap + // buffers and, like Netty, size them on demand: start at a single record so small-payload or + // short-lived connections allocate little, and let growTransportOutBuffer enlarge the wrap + // buffer when a larger batch is actually produced. + transportOutBuffer = ByteBuffer.allocate(packetSize) + transportInBuffer = ByteBuffer.allocate(packetSize) + transportUnreadBuffer = ByteBuffer.allocate(packetSize) userInBuffer = ByteBuffer.allocate(applicationSize) userOutBuffer = ByteBuffer.allocate(applicationSize) @@ -152,12 +151,6 @@ import pekko.util.ByteString TlsEngineHelpers.emptyReadBuffer(transportUnreadBuffer) } - private def applicationBufferSize(applicationSize: Int, packetSize: Int): Int = { - val recordsThatFitTransportBuffer = math.max(1, pooledBufferSize / packetSize) - val recordsPerEngineCall = math.min(MaxApplicationRecordsPerEngineCall, recordsThatFitTransportBuffer) - applicationSize * recordsPerEngineCall - } - private def runPump(): Unit = if ((stateBits & (StageClosedFlag | StageCompletionPendingFlag)) == 0 && (stateBits & InitialPumpEnabledFlag) != 0) { @@ -403,6 +396,8 @@ import pekko.util.ByteString var result = engine.wrap(userInBuffer, transportOutBuffer) while (canContinueWrapping(result)) { + if (transportOutBuffer.remaining < transportPacketSize) + growTransportOutBuffer() val userPosition = userInBuffer.position() val transportPosition = transportOutBuffer.position() result = engine.wrap(userInBuffer, transportOutBuffer) @@ -417,7 +412,8 @@ import pekko.util.ByteString result.getStatus == OK && result.getHandshakeStatus == NOT_HANDSHAKING && userInBuffer.hasRemaining && - transportOutBuffer.remaining >= transportPacketSize + (transportOutBuffer.remaining >= transportPacketSize || + transportOutBuffer.capacity < maxTransportOutBufferSize) @tailrec private def doUnwrap(ignoreOutput: Boolean): Unit = { @@ -567,15 +563,13 @@ import pekko.util.ByteString } private def growTransportOutBuffer(): Unit = { - val oldBuffer = transportOutBuffer - val oldCapacity = oldBuffer.capacity() + val oldCapacity = transportOutBuffer.capacity() if (oldCapacity > Int.MaxValue / 2) throw new IllegalStateException(s"Cannot grow TLS transport output buffer beyond $oldCapacity bytes") - val bigger = acquireTransportBuffer(oldCapacity * 2) + val bigger = ByteBuffer.allocate(oldCapacity * 2) transportOutBuffer.flip() bigger.put(transportOutBuffer) - releaseTransportBuffer(oldBuffer) transportOutBuffer = bigger } @@ -660,32 +654,13 @@ import pekko.util.ByteString } private def releaseBuffers(): Unit = { - releaseTransportBuffer(transportOutBuffer) transportOutBuffer = null userOutBuffer = null - releaseTransportBuffer(transportInBuffer) transportInBuffer = null - releaseTransportBuffer(transportUnreadBuffer) transportUnreadBuffer = null userInBuffer = null } - private def acquireTransportBuffer(requiredCapacity: Int): ByteBuffer = { - val buffer = bufferPool.acquire() - if (buffer.capacity >= requiredCapacity) buffer - else { - releaseTransportBuffer(buffer) - throw new IllegalStateException( - s"TLS packet buffer requires $requiredCapacity bytes but pekko.io.tcp.direct-buffer-size is $pooledBufferSize") - } - } - - private def releaseTransportBuffer(buffer: ByteBuffer): Unit = - if (buffer ne null) { - buffer.clear() - bufferPool.release(buffer) - } - private final class UserInputSlot(inlet: Inlet[SslTlsOutbound]) extends InHandler { private val pendingQueue = new Array[AnyRef](UserInputQueueSize) private var pendingHead = 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
