This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 29d74ecb74 Optimize two-part ByteString concatenation (#2924)
29d74ecb74 is described below

commit 29d74ecb74b26bb0ddcf9c0fa7258f489434a585
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat May 9 22:56:35 2026 +0800

    Optimize two-part ByteString concatenation (#2924)
    
    * feat: optimize two-part ByteString concatenation
    
    Motivation:
    Small ByteString concatenation currently promotes two fragments into 
ByteStrings backed by a Vector, which adds allocation and hurts hot paths that 
append a header and payload before reading or copying.
    
    Modification:
    Add an internal two-fragment ByteString representation for simple non-empty 
fragments, flatten it when appending beyond two fragments, and add focused JMH 
coverage for append, read, copy, and cross-boundary header reads.
    
    Result:
    JMH shows appendTwo allocation dropping from 160 B/op on main to 32 B/op on 
this branch, with improved read and copy throughput while ByteStringSpec and 
MiMa pass.
    
    * fix: bind ByteString2 serializer
    
    * fix: Address ByteString2 review comments
    
    * fix: tighten ByteString2 invariants
    
    * test: cover ByteString2 serialization paths
    
    * perf: avoid LazyList in ByteString2.iterator + cross-check tests + bench
    
    Motivation:
    ByteString2.iterator built MultiByteArrayIterator from `LazyList(...)`. That
    allocates two cons cells plus two thunk closures per iterator, and each 
call to
    MultiByteArrayIterator.normalize() walks the LinearSeq and forces a thunk.
    For the gRPC-shaped "5-byte header ++ payload" hot path the iterator+read
    overhead dominated, so reading via `iterator.getInt` on the result was
    ~30x slower than the equivalent `ByteString1C` baseline.
    
    Modification:
    * ByteString2.iterator now uses a plain two-element `List` -- same shape 
that
      MultiByteArrayIterator already accepts (LinearSeq[ByteArrayIterator]) but
      with no thunks and half the allocations.
    * Add ByteString_concat3way_Benchmark: directional comparison of the two
      candidate strategies for the hot path (`copy` via fromArrayUnsafe vs
      `bs2` via ByteString2.apply) across realistic gRPC payload sizes
      (64B / 1KB / 16KB / 64KB / 256KB), covering concat alone, concat+iterator
      reads, concat+readIntBE, copyToBuffer, asByteBuffers, drop(5)/take(5),
      and the same operations on pre-built results.
    * Add a cross-check test in ByteStringSpec that builds the same logical 
bytes
      as `ByteString1C` (reference) and as `ByteString2` with every possible
      split point, then asserts equivalence across `apply`, full byte-by-byte
      iteration, `iterator.getShort/Int/Long` (BE+LE) at every offset,
      `readShortBE/LE`, `readIntBE/LE`, `readLongBE/LE`, `take/drop/takeRight/
      dropRight/slice`, `copyToBuffer`, `copyToArray`, `indexOf`, `compact`,
      equality and hashCode.
    
    Result:
    JMH on JDK 25 G1GC, single fork, 5x1s measurement (ops/us, higher is 
better):
    
      benchmark             size      before     after     speedup
      downRead_bs2          64        16.4      730.8      44x
      downRead_bs2          1024      27.2      730.8      27x
      downRead_bs2          16384     25.7      727.5      28x
      downRead_bs2          65536     24.4      731.0      30x
      downRead_bs2          262144    18.4      730.6      40x
      concatRead_bs2        64        27.2      616.0      23x
      concatRead_bs2        1024      16.2      609.5      38x
      concatRead_bs2        16384     21.5      608.8      28x
      concatRead_bs2        65536     22.1      616.7      28x
      concatRead_bs2        262144    25.7      612.8      24x
    
    The previously dominant iterator overhead is gone: ByteString2 read paths
    are now within ~1.6x of the contiguous `ByteString1C` baseline (down from
    30-50x slower), while concat-and-read is several times faster than the
    copy path because we avoid the System.arraycopy. All 212 ByteStringSpec
    tests pass including the new cross-check.
    
    * Optimize ByteString2: add copyToBuffer(offset), map, and SWAR-based 
startsWith/endsWith (#2950)
    
    - Add private[pekko] matchesAt to abstract ByteString class (default 
byte-by-byte impl)
    - Override matchesAt in ByteString1C and ByteString1 with SWAR 8-byte 
comparisons
    - Add optimized copyToBuffer(buffer, offset) to ByteString2 (avoids drop() 
allocation)
    - Improve startsWith in ByteString2 to use matchesAt (SWAR-based) instead 
of byte-by-byte byteAt
    - Improve endsWith in ByteString2 to use matchesAt (SWAR-based) instead of 
byte-by-byte byteAt
    - Add optimized map to ByteString2 (pre-allocated array + byteAtUnchecked 
while-loop)
    - foreach in ByteString2 already optimal (delegates to first/second 
fragments)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/cf258ced-c5f8-4bef-8ec4-3de15df750d3
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../PrimitivesSerializationSpec.scala              |  34 ++
 .../apache/pekko/serialization/SerializeSpec.scala |   4 +
 .../org/apache/pekko/util/ByteStringSpec.scala     | 203 +++++++-
 actor/src/main/resources/reference.conf            |   1 +
 .../scala/org/apache/pekko/util/ByteString.scala   | 536 ++++++++++++++++++++-
 .../scala/org/apache/pekko/util/SWARUtil.scala     |  50 ++
 .../pekko/util/ByteString_append_Benchmark.scala   |  48 ++
 .../util/ByteString_concat3way_Benchmark.scala     | 180 +++++++
 8 files changed, 1042 insertions(+), 14 deletions(-)

diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
index 0fdbdd44d5..b03861dd60 100644
--- 
a/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
@@ -21,6 +21,7 @@ import scala.util.Random
 import org.apache.pekko
 import pekko.testkit.PekkoSpec
 import pekko.util.ByteString
+import pekko.util.ByteString.ByteString2
 
 import com.typesafe.config.ConfigFactory
 
@@ -61,6 +62,12 @@ class PrimitivesSerializationSpec extends 
PekkoSpec(PrimitivesSerializationSpec.
     serializer.fromBinary(buffer, "") should ===(msg)
   }
 
+  def expectByteString2(byteString: ByteString): ByteString2 =
+    byteString match {
+      case byteString2: ByteString2 => byteString2
+      case other                    => fail(s"Expected ByteString2, got 
[${other.getClass.getName}]")
+    }
+
   "LongSerializer" must {
     Seq(0L, 1L, -1L, Long.MinValue, Long.MinValue + 1L, Long.MaxValue, 
Long.MaxValue - 1L)
       .map(_.asInstanceOf[AnyRef])
@@ -155,9 +162,13 @@ class PrimitivesSerializationSpec extends 
PekkoSpec(PrimitivesSerializationSpec.
   }
 
   "ByteStringSerializer" must {
+    val twoPartByteString =
+      expectByteString2(ByteString(Array[Byte](1, 2)) ++ 
ByteString(Array[Byte](3, 4, 5)))
+
     Seq(
       "empty string" -> ByteString.empty,
       "simple content" -> ByteString("hello"),
+      "two-part content" -> twoPartByteString,
       "concatenated content" -> (ByteString("hello") ++ ByteString("world")),
       "sliced content" -> ByteString("helloabc").take(5),
       "large concatenated" ->
@@ -177,6 +188,29 @@ class PrimitivesSerializationSpec extends 
PekkoSpec(PrimitivesSerializationSpec.
         }
     }
 
+    "serialize ByteString2 as contiguous bytes" in {
+      val serializer = serialization.serializerFor(twoPartByteString.getClass)
+
+      ByteString(serializer.toBinary(twoPartByteString)) should 
===(ByteString(Array[Byte](1, 2, 3, 4, 5)))
+    }
+
+    "de-serialize bytes to an equal ByteString" in {
+      val serializer = serialization.serializerFor(twoPartByteString.getClass)
+
+      serializer.fromBinary(Array[Byte](1, 2, 3, 4, 5), None) should 
===(twoPartByteString)
+    }
+
+    "serialize and de-serialize ByteString2 using ByteBuffers" in {
+      val serializer =
+        
serialization.serializerFor(twoPartByteString.getClass).asInstanceOf[Serializer 
with ByteBufferSerializer]
+
+      buffer.clear()
+      serializer.toBinary(twoPartByteString, buffer)
+      buffer.flip()
+
+      serializer.fromBinary(buffer, "") should ===(twoPartByteString)
+    }
+
     "have right serializer id" in {
       // checking because moved to pekko-actor
       serialization.serializerFor(1L.asInstanceOf[AnyRef].getClass).identifier 
=== 21
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala
index 22c4321bdd..165d0e09ba 100644
--- 
a/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala
@@ -408,6 +408,7 @@ class ReferenceSerializationSpec extends 
PekkoSpec(SerializationTests.mostlyRefe
       serializerMustBe(classOf[String], classOf[StringSerializer])
       serializerMustBe(classOf[ByteString.ByteString1], 
classOf[ByteStringSerializer])
       serializerMustBe(classOf[ByteString.ByteString1C], 
classOf[ByteStringSerializer])
+      serializerMustBe(classOf[ByteString.ByteString2], 
classOf[ByteStringSerializer])
       serializerMustBe(classOf[ByteString.ByteStrings], 
classOf[ByteStringSerializer])
 
     }
@@ -453,6 +454,9 @@ class AllowJavaSerializationSpec extends 
PekkoSpec(SerializationTests.allowJavaS
       serializerMustBe(classOf[java.lang.Integer], classOf[IntSerializer])
       serializerMustBe(classOf[String], classOf[StringSerializer])
       serializerMustBe(classOf[ByteString.ByteString1], 
classOf[ByteStringSerializer])
+      serializerMustBe(classOf[ByteString.ByteString1C], 
classOf[ByteStringSerializer])
+      serializerMustBe(classOf[ByteString.ByteString2], 
classOf[ByteStringSerializer])
+      serializerMustBe(classOf[ByteString.ByteStrings], 
classOf[ByteStringSerializer])
     }
 
     "not support serialization for other classes" in {
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
index c596dcad6e..a5066552d8 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
@@ -30,7 +30,7 @@ import org.scalatestplus.scalacheck.Checkers
 
 import org.apache.pekko
 import pekko.io.UnsynchronizedByteArrayInputStream
-import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteStrings }
+import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteString2, 
ByteStrings }
 
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpec
@@ -106,6 +106,18 @@ class ByteStringSpec extends AnyWordSpec with Matchers 
with Checkers {
     deserialize(serialize(obj)) == obj
   }
 
+  def expectByteString2(byteString: ByteString): ByteString2 =
+    byteString match {
+      case byteString2: ByteString2 => byteString2
+      case other                    => fail(s"Expected ByteString2, got 
[${other.getClass.getName}]")
+    }
+
+  def expectByteStrings(byteString: ByteString): ByteStrings =
+    byteString match {
+      case byteStrings: ByteStrings => byteStrings
+      case other                    => fail(s"Expected ByteStrings, got 
[${other.getClass.getName}]")
+    }
+
   def hexFromSer(obj: AnyRef) = {
     val os = new ByteArrayOutputStream
     val bos = new ObjectOutputStream(os)
@@ -1749,6 +1761,35 @@ class ByteStringSpec extends AnyWordSpec with Matchers 
with Checkers {
       "dropping" in { check((a: ByteString, b: ByteString) => (a ++ 
b).drop(a.size) == b) }
     }
 
+    "preserve concatenation direction" when {
+      "creating ByteString2 from two simple fragments" in {
+        val compact = ByteString1C(Array[Byte](1, 2))
+        val sliced = ByteString1(Array[Byte](0, 3, 4, 5), 1, 2)
+
+        val forward = expectByteString2(compact ++ sliced)
+        forward should ===(ByteString(Array[Byte](1, 2, 3, 4)))
+        forward.asByteBuffers.map(_.remaining()).toSeq should ===(Seq(2, 2))
+
+        val reverse = expectByteString2(sliced ++ compact)
+        reverse should ===(ByteString(Array[Byte](3, 4, 1, 2)))
+        reverse.asByteBuffers.map(_.remaining()).toSeq should ===(Seq(2, 2))
+      }
+
+      "promoting ByteString2 to ByteStrings when prepending or appending 
another fragment" in {
+        val middle = expectByteString2(ByteString1C(Array[Byte](2, 3)) ++ 
ByteString1C(Array[Byte](4, 5)))
+        val prefix = ByteString1C(Array[Byte](1))
+        val suffix = ByteString1C(Array[Byte](6))
+
+        val prepended = expectByteStrings(prefix ++ middle)
+        prepended should ===(ByteString(Array[Byte](1, 2, 3, 4, 5)))
+
+        val appended = expectByteStrings(middle ++ suffix)
+        appended should ===(ByteString(Array[Byte](2, 3, 4, 5, 6)))
+
+        expectByteStrings(prepended ++ suffix) should 
===(ByteString(Array[Byte](1, 2, 3, 4, 5, 6)))
+      }
+    }
+
     "be equal to the original" when {
       "compacting" in {
         check { (xs: ByteString) =>
@@ -2038,6 +2079,28 @@ class ByteStringSpec extends AnyWordSpec with Matchers 
with Checkers {
         deserialize(serialize(bs1)) shouldEqual bs1
         deserialize(serialize(bss)) shouldEqual bss
       }
+
+      "given ByteString2" in {
+        val original = expectByteString2(ByteString1C(Array[Byte](1, 2, 3)) ++ 
ByteString1C(Array[Byte](4, 5)))
+
+        deserialize(serialize(original)) match {
+          case deserialized: ByteString2 => deserialized shouldEqual original
+          case other                     => fail(s"Expected deserialized 
ByteString2, got [${other.getClass.getName}]")
+        }
+      }
+
+      "given ByteString2 with sliced fragments" in {
+        val left = ByteString1(Array[Byte](0, 1, 2, 3), 1, 2)
+        val right = ByteString1(Array[Byte](4, 5, 6, 0), 0, 3)
+        val original = expectByteString2(left ++ right)
+
+        deserialize(serialize(original)) match {
+          case deserialized: ByteString2 =>
+            deserialized shouldEqual ByteString(Array[Byte](1, 2, 4, 5, 6))
+            deserialized.asByteBuffers.map(_.remaining()).toSeq should 
===(Seq(2, 3))
+          case other => fail(s"Expected deserialized ByteString2, got 
[${other.getClass.getName}]")
+        }
+      }
     }
 
     "unsafely wrap and unwrap bytes" in {
@@ -2122,6 +2185,18 @@ class ByteStringSpec extends AnyWordSpec with Matchers 
with Checkers {
       byteStrings.readLongLE(8) should ===(0x100F0E0D0C0B0A09L)
     }
 
+    "read primitive values across ByteString2 fragment boundaries" in {
+      val byteString2 =
+        expectByteString2(ByteString1C(Array[Byte](1, 2, 3)) ++ 
ByteString1C(Array[Byte](4, 5, 6, 7, 8)))
+
+      byteString2.readShortBE(2) should ===(0x0304.toShort)
+      byteString2.readShortLE(2) should ===(0x0403.toShort)
+      byteString2.readIntBE(1) should ===(0x02030405)
+      byteString2.readIntLE(1) should ===(0x05040302)
+      byteString2.readLongBE(0) should ===(0x0102030405060708L)
+      byteString2.readLongLE(0) should ===(0x0807060504030201L)
+    }
+
     "throw IndexOutOfBoundsException for readShortBE/LE with insufficient 
data" in {
       val bs1C = ByteString1C(Array[Byte](1))
       an[IndexOutOfBoundsException] should be thrownBy bs1C.readShortBE(0)
@@ -2343,6 +2418,132 @@ class ByteStringSpec extends AnyWordSpec with Matchers 
with Checkers {
       noException should be thrownBy builder.sizeHint(0)
       builder.result() should ===(ByteString(42.toByte))
     }
+
+    "ByteString2 must behave exactly like ByteString1C across every 
read/transform op" in {
+      // Build identical bytes as ByteString1C (reference) and as ByteString2 
with several split points;
+      // every public read or transform op MUST return the same value. This 
guards against regressions
+      // in the iterator/normalize path (e.g. the LazyList -> List fix on 
ByteString2.iterator).
+      val totalSize = 23 // odd, large enough to exercise short/int/long reads 
at every fragment boundary
+      val bytes = Array.tabulate[Byte](totalSize)(i => ((i * 31 + 7) & 
0xFF).toByte)
+      val reference = ByteString1C(bytes)
+
+      // Try every possible non-empty split: bs2 has fragments [0, split) ++ 
[split, totalSize)
+      for (split <- 1 until totalSize) {
+        val left = ByteString1C(java.util.Arrays.copyOfRange(bytes, 0, split))
+        val right = ByteString1C(java.util.Arrays.copyOfRange(bytes, split, 
totalSize))
+        val bs2 = expectByteString2(left ++ right)
+
+        withClue(s"split=$split: ") {
+          // basic contract
+          bs2.size should ===(reference.size)
+          bs2 should ===(reference)
+          bs2.hashCode should ===(reference.hashCode)
+          bs2.asByteBuffers.map(_.remaining()).toSeq should ===(Seq(split, 
totalSize - split))
+
+          // apply(idx) at every index
+          for (i <- 0 until totalSize) bs2(i) should ===(reference(i))
+
+          // iterator: byte-by-byte equality with reference
+          val refIt = reference.iterator
+          val bs2It = bs2.iterator
+          var idx = 0
+          while (refIt.hasNext) {
+            withClue(s"byte at $idx: ") { bs2It.next() should 
===(refIt.next()) }
+            idx += 1
+          }
+          bs2It.hasNext should ===(false)
+
+          // iterator.getShort/getInt/getLong at every valid starting offset, 
BE and LE
+          for (off <- 0 to totalSize - 2) {
+            val r = reference.iterator.drop(off)
+            val b = bs2.iterator.drop(off)
+            r.getShort(BIG_ENDIAN) should ===(b.getShort(BIG_ENDIAN))
+
+            val r2 = reference.iterator.drop(off)
+            val b2 = bs2.iterator.drop(off)
+            r2.getShort(LITTLE_ENDIAN) should ===(b2.getShort(LITTLE_ENDIAN))
+          }
+          for (off <- 0 to totalSize - 4) {
+            val r = reference.iterator.drop(off)
+            val b = bs2.iterator.drop(off)
+            r.getInt(BIG_ENDIAN) should ===(b.getInt(BIG_ENDIAN))
+
+            val r2 = reference.iterator.drop(off)
+            val b2 = bs2.iterator.drop(off)
+            r2.getInt(LITTLE_ENDIAN) should ===(b2.getInt(LITTLE_ENDIAN))
+          }
+          for (off <- 0 to totalSize - 8) {
+            val r = reference.iterator.drop(off)
+            val b = bs2.iterator.drop(off)
+            r.getLong(BIG_ENDIAN) should ===(b.getLong(BIG_ENDIAN))
+
+            val r2 = reference.iterator.drop(off)
+            val b2 = bs2.iterator.drop(off)
+            r2.getLong(LITTLE_ENDIAN) should ===(b2.getLong(LITTLE_ENDIAN))
+          }
+
+          // direct readShort/Int/Long at every valid offset, BE and LE
+          for (off <- 0 to totalSize - 2) {
+            bs2.readShortBE(off) should ===(reference.readShortBE(off))
+            bs2.readShortLE(off) should ===(reference.readShortLE(off))
+          }
+          for (off <- 0 to totalSize - 4) {
+            bs2.readIntBE(off) should ===(reference.readIntBE(off))
+            bs2.readIntLE(off) should ===(reference.readIntLE(off))
+          }
+          for (off <- 0 to totalSize - 8) {
+            bs2.readLongBE(off) should ===(reference.readLongBE(off))
+            bs2.readLongLE(off) should ===(reference.readLongLE(off))
+          }
+
+          // take/drop/slice at every cut, including fragment boundary
+          for (n <- 0 to totalSize) {
+            bs2.take(n) should ===(reference.take(n))
+            bs2.drop(n) should ===(reference.drop(n))
+            bs2.takeRight(n) should ===(reference.takeRight(n))
+            bs2.dropRight(n) should ===(reference.dropRight(n))
+          }
+          for {
+            from <- 0 to totalSize
+            until <- from to totalSize
+          } bs2.slice(from, until) should ===(reference.slice(from, until))
+
+          // copyToBuffer
+          val refBuf = ByteBuffer.allocate(totalSize)
+          val bs2Buf = ByteBuffer.allocate(totalSize)
+          val refCopied = reference.copyToBuffer(refBuf)
+          val bs2Copied = bs2.copyToBuffer(bs2Buf)
+          bs2Copied should ===(refCopied)
+          refBuf.flip(); bs2Buf.flip()
+          bs2Buf should ===(refBuf)
+
+          // copyToArray with various (start, len) windows
+          for {
+            start <- 0 to totalSize
+            len <- 0 to totalSize - start
+          } {
+            val refArr = new Array[Byte](totalSize)
+            val bs2Arr = new Array[Byte](totalSize)
+            val nRef = reference.copyToArray(refArr, start, len)
+            val nBs2 = bs2.copyToArray(bs2Arr, start, len)
+            nBs2 should ===(nRef)
+            bs2Arr.toSeq should ===(refArr.toSeq)
+          }
+
+          // indexOf for every distinct byte value present, plus a few absent
+          val present = bytes.toSet
+          val candidates = present ++ Set[Byte](Byte.MinValue, 0.toByte, 
Byte.MaxValue)
+          for (b <- candidates) {
+            bs2.indexOf(b) should ===(reference.indexOf(b))
+            for (from <- 0 to totalSize) bs2.indexOf(b, from) should 
===(reference.indexOf(b, from))
+          }
+
+          // compact yields equivalent ByteString1C
+          bs2.compact should ===(reference)
+          bs2.compact.isCompact should ===(true)
+        }
+      }
+    }
   }
 
   "A ByteStringIterator" must {
diff --git a/actor/src/main/resources/reference.conf 
b/actor/src/main/resources/reference.conf
index 5474a38432..5b4add60f3 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -853,6 +853,7 @@ pekko {
       "java.lang.String" = primitive-string
       "org.apache.pekko.util.ByteString$ByteString1C" = primitive-bytestring
       "org.apache.pekko.util.ByteString$ByteString1" = primitive-bytestring
+      "org.apache.pekko.util.ByteString$ByteString2" = primitive-bytestring
       "org.apache.pekko.util.ByteString$ByteStrings" = primitive-bytestring
       "java.lang.Long" = primitive-long
       "scala.Long" = primitive-long
diff --git a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala 
b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
index 0d4984a2fa..44db0e80c7 100644
--- a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
@@ -187,6 +187,8 @@ object ByteString {
   final class ByteString1C private (private val bytes: Array[Byte]) extends 
CompactByteString {
     def apply(idx: Int): Byte = bytes(idx)
 
+    private[pekko] override def byteAtUnchecked(offset: Int): Byte = 
bytes(offset)
+
     override def length: Int = bytes.length
 
     // Avoid `iterator` in performance sensitive code, call ops directly on 
ByteString instead
@@ -217,7 +219,13 @@ object ByteString {
     override def ++(that: ByteString): ByteString = {
       if (that.isEmpty) this
       else if (this.isEmpty) that
-      else toByteString1 ++ that
+      else
+        that match {
+          case b: ByteString1C => 
ByteString2.fromNonEmptySimpleFragments(this, b)
+          case b: ByteString1  => 
ByteString2.fromNonEmptySimpleFragments(this, b)
+          case bs: ByteString2 => ByteStrings(this, bs)
+          case bs: ByteStrings => ByteStrings(toByteString1, bs)
+        }
     }
 
     override def take(n: Int): ByteString =
@@ -406,7 +414,8 @@ object ByteString {
      * INTERNAL API: compare `len` bytes from this ByteString starting at 
`haystackOffset`
      * against `needle[needleOffset..needleOffset+len)`.
      */
-    private[pekko] def matchesAt(haystackOffset: Int, needle: Array[Byte], 
needleOffset: Int, len: Int): Boolean = {
+    private[pekko] override def matchesAt(
+        haystackOffset: Int, needle: Array[Byte], needleOffset: Int, len: 
Int): Boolean = {
       var hIdx = haystackOffset
       var nIdx = needleOffset
       var remaining = len
@@ -518,6 +527,8 @@ object ByteString {
 
     def apply(idx: Int): Byte = bytes(checkRangeConvert(idx))
 
+    private[pekko] override def byteAtUnchecked(offset: Int): Byte = 
bytes(startIndex + offset)
+
     // Avoid `iterator` in performance sensitive code, call ops directly on 
ByteString instead
     override def iterator: ByteIterator.ByteArrayIterator =
       ByteIterator.ByteArrayIterator(bytes, startIndex, startIndex + length)
@@ -633,11 +644,12 @@ object ByteString {
       else if (this.isEmpty) that
       else
         that match {
-          case b: ByteString1C => ByteStrings(this, b.toByteString1)
+          case b: ByteString1C => 
ByteString2.fromNonEmptySimpleFragments(this, b)
           case b: ByteString1  =>
             if ((bytes eq b.bytes) && (startIndex + length == b.startIndex))
               new ByteString1(bytes, startIndex, length + b.length)
-            else ByteStrings(this, b)
+            else ByteString2.fromNonEmptySimpleFragments(this, b)
+          case bs: ByteString2 => ByteStrings(this, bs)
           case bs: ByteStrings => ByteStrings(this, bs)
         }
     }
@@ -818,7 +830,8 @@ object ByteString {
      * INTERNAL API: compare `len` bytes from this ByteString starting at 
logical `haystackOffset`
      * against `needle[needleOffset..needleOffset+len)`.
      */
-    private[pekko] def matchesAt(haystackOffset: Int, needle: Array[Byte], 
needleOffset: Int, len: Int): Boolean = {
+    private[pekko] override def matchesAt(
+        haystackOffset: Int, needle: Array[Byte], needleOffset: Int, len: 
Int): Boolean = {
       var hIdx = startIndex + haystackOffset
       var nIdx = needleOffset
       var remaining = len
@@ -886,17 +899,35 @@ object ByteString {
 
   private[pekko] object ByteStrings extends Companion {
     def apply(bytestrings: Vector[ByteString1]): ByteString =
-      new ByteStrings(bytestrings, bytestrings.foldLeft(0)(_ + _.length))
-
-    def apply(bytestrings: Vector[ByteString1], length: Int): ByteString = new 
ByteStrings(bytestrings, length)
+      apply(bytestrings, bytestrings.foldLeft(0)(_ + _.length))
+
+    def apply(bytestrings: Vector[ByteString1], length: Int): ByteString =
+      bytestrings.length match {
+        case 0 =>
+          assert(length == 0, s"empty ByteStrings length must be 0, was 
$length")
+          ByteString.empty
+        case 1 =>
+          val only = bytestrings.head
+          assert(only.length == length, s"single ByteString length 
${only.length} did not match $length")
+          only
+        case 2 => ByteString2(bytestrings(0), bytestrings(1), length)
+        case _ => new ByteStrings(bytestrings, length)
+      }
 
     def apply(b1: ByteString1, b2: ByteString1): ByteString = compare(b1, b2) 
match {
-      case 3 => new ByteStrings(Vector(b1, b2), b1.length + b2.length)
+      case 3 => ByteString2(b1, b2)
       case 2 => b2
       case 1 => b1
       case 0 => ByteString.empty
     }
 
+    def apply(b: ByteString, bs: ByteString2): ByteString = compare(b, bs) 
match {
+      case 3 => new ByteStrings(addFragments(b, bs), b.length + bs.length)
+      case 2 => bs
+      case 1 => b
+      case 0 => ByteString.empty
+    }
+
     def apply(b: ByteString1, bs: ByteStrings): ByteString = compare(b, bs) 
match {
       case 3 => new ByteStrings(b +: bs.bytestrings, bs.length + b.length)
       case 2 => bs
@@ -904,6 +935,13 @@ object ByteString {
       case 0 => ByteString.empty
     }
 
+    def apply(bs: ByteString2, b: ByteString): ByteString = compare(bs, b) 
match {
+      case 3 => new ByteStrings(addFragments(bs, b), bs.length + b.length)
+      case 2 => b
+      case 1 => bs
+      case 0 => ByteString.empty
+    }
+
     def apply(bs: ByteStrings, b: ByteString1): ByteString = compare(bs, b) 
match {
       case 3 => new ByteStrings(bs.bytestrings :+ b, bs.length + b.length)
       case 2 => b
@@ -911,6 +949,27 @@ object ByteString {
       case 0 => ByteString.empty
     }
 
+    def apply(bs: ByteStrings, b: ByteString2): ByteString = compare(bs, b) 
match {
+      case 3 => new ByteStrings(addFragments(bs, b), bs.length + b.length)
+      case 2 => b
+      case 1 => bs
+      case 0 => ByteString.empty
+    }
+
+    def apply(b: ByteString2, bs: ByteStrings): ByteString = compare(b, bs) 
match {
+      case 3 => new ByteStrings(addFragments(b, bs), b.length + bs.length)
+      case 2 => bs
+      case 1 => b
+      case 0 => ByteString.empty
+    }
+
+    def apply(b1: ByteString2, b2: ByteString2): ByteString = compare(b1, b2) 
match {
+      case 3 => new ByteStrings(addFragments(b1, b2), b1.length + b2.length)
+      case 2 => b2
+      case 1 => b1
+      case 0 => ByteString.empty
+    }
+
     def apply(bs1: ByteStrings, bs2: ByteStrings): ByteString = compare(bs1, 
bs2) match {
       case 3 => new ByteStrings(bs1.bytestrings ++ bs2.bytestrings, bs1.length 
+ bs2.length)
       case 2 => bs2
@@ -925,9 +984,24 @@ object ByteString {
       else if (b2.isEmpty) 1
       else 3
 
+    private[ByteString] def addFragments(first: ByteString, second: 
ByteString): Vector[ByteString1] = {
+      val builder = new VectorBuilder[ByteString1]
+      addFragments(builder, first)
+      addFragments(builder, second)
+      builder.result()
+    }
+
+    private[ByteString] def addFragments(builder: VectorBuilder[ByteString1], 
byteString: ByteString): Unit =
+      byteString match {
+        case b: ByteString1C => builder += b.toByteString1
+        case b: ByteString1  => builder += b
+        case b: ByteString2  => b.addFragmentsTo(builder)
+        case b: ByteStrings  => builder ++= b.bytestrings
+      }
+
     val SerializationIdentity = 2.toByte
 
-    def readFromInputStream(is: ObjectInputStream): ByteStrings = {
+    def readFromInputStream(is: ObjectInputStream): ByteString = {
       val nByteStrings = is.readInt()
 
       val builder = new VectorBuilder[ByteString1]
@@ -942,10 +1016,418 @@ object ByteString {
         length += bs.length
         i += 1
       }
-      new ByteStrings(builder.result(), length)
+      ByteStrings(builder.result(), length)
     }
   }
 
+  private[pekko] object ByteString2 extends Companion {
+    val SerializationIdentity = 3.toByte
+
+    private[ByteString] def fromNonEmptySimpleFragments(first: ByteString, 
second: ByteString): ByteString =
+      fromNonEmptySimpleFragments(first, second, first.length + second.length)
+
+    private[ByteString] def fromNonEmptySimpleFragments(
+        first: ByteString,
+        second: ByteString,
+        length: Int): ByteString =
+      new ByteString2(first, second, length)
+
+    def apply(first: ByteString, second: ByteString): ByteString =
+      apply(first, second, first.length + second.length)
+
+    private[ByteString] def apply(first: ByteString, second: ByteString, 
length: Int): ByteString =
+      ByteStrings.compare(first, second) match {
+        case 3 =>
+          if (isSimpleFragment(first) && isSimpleFragment(second))
+            fromNonEmptySimpleFragments(first, second, length)
+          else
+            ByteStrings(ByteStrings.addFragments(first, second), length)
+        case 2 =>
+          assert(second.length == length, s"second ByteString length 
${second.length} did not match $length")
+          second
+        case 1 =>
+          assert(first.length == length, s"first ByteString length 
${first.length} did not match $length")
+          first
+        case 0 =>
+          assert(length == 0, s"empty ByteString length must be 0, was 
$length")
+          ByteString.empty
+      }
+
+    private def isSimpleFragment(byteString: ByteString): Boolean =
+      byteString match {
+        case _: ByteString1C | _: ByteString1 => true
+        case _: ByteString2 | _: ByteStrings  => false
+      }
+
+    def readFromInputStream(is: ObjectInputStream): ByteString =
+      ByteString2(ByteString1.readFromInputStream(is), 
ByteString1.readFromInputStream(is))
+  }
+
+  /**
+   * A ByteString with exactly two simple fragments.
+   */
+  private[pekko] final class ByteString2 private (
+      private val first: ByteString,
+      private val second: ByteString,
+      val length: Int)
+      extends ByteString
+      with Serializable {
+    assert(ByteString2.isSimpleFragment(first),
+      s"first must be a simple ByteString fragment, was 
${first.getClass.getName}")
+    assert(
+      ByteString2.isSimpleFragment(second),
+      s"second must be a simple ByteString fragment, was 
${second.getClass.getName}")
+    assert(first.nonEmpty, "first must not be empty")
+    assert(second.nonEmpty, "second must not be empty")
+    assert(
+      first.length + second.length == length,
+      s"ByteString2 length ${first.length + second.length} did not match 
$length")
+
+    private[this] val firstLength: Int = first.length
+
+    def apply(idx: Int): Byte =
+      if (0 <= idx && idx < length) {
+        byteAtUnchecked(idx)
+      } else throw new IndexOutOfBoundsException(idx.toString)
+
+    /** Avoid `iterator` in performance sensitive code, call ops directly on 
ByteString instead */
+    override def iterator: ByteIterator.MultiByteArrayIterator =
+      ByteIterator.MultiByteArrayIterator(byteArrayIterator(first) :: 
byteArrayIterator(second) :: Nil)
+
+    private def byteArrayIterator(byteString: ByteString): 
ByteIterator.ByteArrayIterator =
+      byteString match {
+        case b: ByteString1C                 => b.iterator
+        case b: ByteString1                  => b.iterator
+        case _: ByteString2 | _: ByteStrings =>
+          throw new IllegalStateException("ByteString2 fragments must be 
compact or sliced ByteStrings")
+      }
+
+    def ++(that: ByteString): ByteString = {
+      if (that.isEmpty) this
+      else if (this.isEmpty) that
+      else
+        that match {
+          case b: ByteString1C => ByteStrings(this, b)
+          case b: ByteString1  => ByteStrings(this, b)
+          case bs: ByteString2 => ByteStrings(this, bs)
+          case bs: ByteStrings => ByteStrings(this, bs)
+        }
+    }
+
+    private[pekko] def byteStringCompanion = ByteString2
+
+    def isCompact: Boolean = false
+
+    override def copyToBuffer(buffer: ByteBuffer): Int = {
+      val written = first.copyToBuffer(buffer)
+      if (buffer.hasRemaining) written + second.copyToBuffer(buffer)
+      else written
+    }
+
+    override def copyToBuffer(buffer: ByteBuffer, offset: Int): Int =
+      if (offset <= 0) {
+        val written = first.copyToBuffer(buffer)
+        if (buffer.hasRemaining) written + second.copyToBuffer(buffer)
+        else written
+      } else if (offset >= firstLength) {
+        second.copyToBuffer(buffer, offset - firstLength)
+      } else {
+        val written = first.copyToBuffer(buffer, offset)
+        if (buffer.hasRemaining) written + second.copyToBuffer(buffer)
+        else written
+      }
+
+    def compact: CompactByteString = {
+      val ar = new Array[Byte](length)
+      first.copyToArray(ar, 0, firstLength)
+      second.copyToArray(ar, firstLength, length - firstLength)
+      ByteString1C(ar)
+    }
+
+    def asByteBuffer: ByteBuffer = compact.asByteBuffer
+
+    def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] =
+      List(first.asByteBuffer, second.asByteBuffer)
+
+    override def asInputStream: InputStream =
+      new SequenceInputStream(Iterator(first.asInputStream, 
second.asInputStream).asJavaEnumeration)
+
+    def decodeString(charset: String): String = compact.decodeString(charset)
+
+    def decodeString(charset: Charset): String = compact.decodeString(charset)
+
+    override def decodeBase64: ByteString = compact.decodeBase64
+
+    override def encodeBase64: ByteString = compact.encodeBase64
+
+    private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit = {
+      first.writeToOutputStream(os)
+      second.writeToOutputStream(os)
+    }
+
+    override def take(n: Int): ByteString =
+      if (n <= 0) ByteString.empty
+      else if (n >= length) this
+      else {
+        if (n <= firstLength) first.take(n)
+        else ByteString2(first, second.take(n - firstLength), n)
+      }
+
+    override def dropRight(n: Int): ByteString =
+      if (0 < n && n < length) {
+        val secondLength = length - firstLength
+        if (n < secondLength) ByteString2(first, second.dropRight(n), length - 
n)
+        else first.dropRight(n - secondLength)
+      } else if (n >= length) ByteString.empty
+      else this
+
+    override def slice(from: Int, until: Int): ByteString = {
+      val lo = math.max(from, 0)
+      val hi = math.min(until, length)
+      if (lo >= hi) ByteString.empty
+      else if (lo == 0 && hi == length) this
+      else drop(lo).take(hi - lo)
+    }
+
+    override def drop(n: Int): ByteString =
+      if (n <= 0) this
+      else if (n >= length) ByteString.empty
+      else {
+        if (n < firstLength) ByteString2(first.drop(n), second, length - n)
+        else second.drop(n - firstLength)
+      }
+
+    override def indexOf[B >: Byte](elem: B, from: Int): Int =
+      if (from >= length) -1
+      else {
+        val start = math.max(from, 0)
+        if (start < firstLength) {
+          val firstIndex = first.indexOf(elem, start)
+          if (firstIndex >= 0) firstIndex
+          else {
+            val secondIndex = second.indexOf(elem, 0)
+            if (secondIndex >= 0) firstLength + secondIndex else -1
+          }
+        } else {
+          val secondIndex = second.indexOf(elem, start - firstLength)
+          if (secondIndex >= 0) firstLength + secondIndex else -1
+        }
+      }
+
+    override def indexOf(elem: Byte, from: Int): Int =
+      if (from >= length) -1
+      else {
+        val start = math.max(from, 0)
+        if (start < firstLength) {
+          val firstIndex = first.indexOf(elem, start)
+          if (firstIndex >= 0) firstIndex
+          else {
+            val secondIndex = second.indexOf(elem, 0)
+            if (secondIndex >= 0) firstLength + secondIndex else -1
+          }
+        } else {
+          val secondIndex = second.indexOf(elem, start - firstLength)
+          if (secondIndex >= 0) firstLength + secondIndex else -1
+        }
+      }
+
+    override def indexOf(elem: Byte, from: Int, to: Int): Int = {
+      val start = math.max(from, 0)
+      val end = math.min(to, length)
+      if (start >= end) -1
+      else {
+        if (start < firstLength) {
+          val firstIndex = first.indexOf(elem, start, math.min(end, 
firstLength))
+          if (firstIndex >= 0) firstIndex
+          else if (end > firstLength) {
+            val secondIndex = second.indexOf(elem, 0, end - firstLength)
+            if (secondIndex >= 0) firstLength + secondIndex else -1
+          } else -1
+        } else {
+          val secondIndex = second.indexOf(elem, start - firstLength, end - 
firstLength)
+          if (secondIndex >= 0) firstLength + secondIndex else -1
+        }
+      }
+    }
+
+    override def lastIndexOf[B >: Byte](elem: B, end: Int): Int =
+      if (end < 0) -1
+      else {
+        val cappedEnd = math.min(end, length - 1)
+        if (cappedEnd >= firstLength) {
+          val secondIndex = second.lastIndexOf(elem, cappedEnd - firstLength)
+          if (secondIndex >= 0) firstLength + secondIndex
+          else first.lastIndexOf(elem, firstLength - 1)
+        } else first.lastIndexOf(elem, cappedEnd)
+      }
+
+    override def lastIndexOf(elem: Byte, end: Int): Int =
+      if (end < 0) -1
+      else {
+        val cappedEnd = math.min(end, length - 1)
+        if (cappedEnd >= firstLength) {
+          val secondIndex = second.lastIndexOf(elem, cappedEnd - firstLength)
+          if (secondIndex >= 0) firstLength + secondIndex
+          else first.lastIndexOf(elem, firstLength - 1)
+        } else first.lastIndexOf(elem, cappedEnd)
+      }
+
+    override def copyToArray[B >: Byte](dest: Array[B], start: Int, len: Int): 
Int = {
+      val totalToCopy = math.max(0, math.min(math.min(len, length), 
dest.length - start))
+      if (totalToCopy > 0) {
+        val firstCopied = first.copyToArray(dest, start, totalToCopy)
+        if (firstCopied < totalToCopy)
+          second.copyToArray(dest, start + firstCopied, totalToCopy - 
firstCopied)
+      }
+      totalToCopy
+    }
+
+    override def foreach[@specialized U](f: Byte => U): Unit = {
+      first.foreach(f)
+      second.foreach(f)
+    }
+
+    override def startsWith(bytes: Array[Byte], offset: Int): Boolean = {
+      val needleLen = bytes.length
+      if (length - offset < needleLen) return false
+      if (needleLen == 0) return true
+      val lo = math.max(offset, 0)
+      if (lo >= firstLength) {
+        // range starts entirely in second
+        second.matchesAt(lo - firstLength, bytes, 0, needleLen)
+      } else if (lo + needleLen <= firstLength) {
+        // range ends entirely in first
+        first.matchesAt(lo, bytes, 0, needleLen)
+      } else {
+        // range spans both fragments
+        val firstPart = firstLength - lo
+        first.matchesAt(lo, bytes, 0, firstPart) &&
+        second.matchesAt(0, bytes, firstPart, needleLen - firstPart)
+      }
+    }
+
+    override def endsWith(bytes: Array[Byte]): Boolean = {
+      val needleLen = bytes.length
+      if (length < needleLen) return false
+      if (needleLen == 0) return true
+      val startPos = length - needleLen
+      if (startPos >= firstLength) {
+        // range is entirely in second
+        second.matchesAt(startPos - firstLength, bytes, 0, needleLen)
+      } else {
+        // range spans both fragments
+        val firstPart = firstLength - startPos
+        first.matchesAt(startPos, bytes, 0, firstPart) &&
+        second.matchesAt(0, bytes, firstPart, needleLen - firstPart)
+      }
+    }
+
+    override def map[A](f: Byte => Byte): ByteString = {
+      val result = new Array[Byte](length)
+      val secondLen = length - firstLength
+      var i = 0
+      while (i < firstLength) {
+        result(i) = f(first.byteAtUnchecked(i))
+        i += 1
+      }
+      var j = 0
+      while (j < secondLen) {
+        result(firstLength + j) = f(second.byteAtUnchecked(j))
+        j += 1
+      }
+      ByteString1C(result)
+    }
+
+    private[this] def byteAt(offset: Int, firstLength: Int): Byte =
+      if (offset < firstLength) first.byteAtUnchecked(offset) else 
second.byteAtUnchecked(offset - firstLength)
+
+    private[pekko] override def byteAtUnchecked(offset: Int): Byte = {
+      val firstLength = this.firstLength
+      byteAt(offset, firstLength)
+    }
+
+    private[pekko] override def readShortBEUnchecked(offset: Int): Short = {
+      val firstLen = firstLength
+      if (offset + java.lang.Short.BYTES <= firstLen) 
first.readShortBEUnchecked(offset)
+      else if (offset >= firstLen) second.readShortBEUnchecked(offset - 
firstLen)
+      else SWARUtil.getShortBE(byteAt(offset, firstLen), byteAt(offset + 1, 
firstLen))
+    }
+
+    private[pekko] override def readShortLEUnchecked(offset: Int): Short = {
+      val firstLen = firstLength
+      if (offset + java.lang.Short.BYTES <= firstLen) 
first.readShortLEUnchecked(offset)
+      else if (offset >= firstLen) second.readShortLEUnchecked(offset - 
firstLen)
+      else SWARUtil.getShortLE(byteAt(offset, firstLen), byteAt(offset + 1, 
firstLen))
+    }
+
+    private[pekko] override def readIntBEUnchecked(offset: Int): Int = {
+      val firstLen = firstLength
+      if (offset + java.lang.Integer.BYTES <= firstLen) 
first.readIntBEUnchecked(offset)
+      else if (offset >= firstLen) second.readIntBEUnchecked(offset - firstLen)
+      else {
+        SWARUtil.getIntBE(
+          byteAt(offset, firstLen),
+          byteAt(offset + 1, firstLen),
+          byteAt(offset + 2, firstLen),
+          byteAt(offset + 3, firstLen))
+      }
+    }
+
+    private[pekko] override def readIntLEUnchecked(offset: Int): Int = {
+      val firstLen = firstLength
+      if (offset + java.lang.Integer.BYTES <= firstLen) 
first.readIntLEUnchecked(offset)
+      else if (offset >= firstLen) second.readIntLEUnchecked(offset - firstLen)
+      else {
+        SWARUtil.getIntLE(
+          byteAt(offset, firstLen),
+          byteAt(offset + 1, firstLen),
+          byteAt(offset + 2, firstLen),
+          byteAt(offset + 3, firstLen))
+      }
+    }
+
+    private[pekko] override def readLongBEUnchecked(offset: Int): Long = {
+      val firstLen = firstLength
+      if (offset + java.lang.Long.BYTES <= firstLen) 
first.readLongBEUnchecked(offset)
+      else if (offset >= firstLen) second.readLongBEUnchecked(offset - 
firstLen)
+      else {
+        SWARUtil.getLongBE(
+          byteAt(offset, firstLen),
+          byteAt(offset + 1, firstLen),
+          byteAt(offset + 2, firstLen),
+          byteAt(offset + 3, firstLen),
+          byteAt(offset + 4, firstLen),
+          byteAt(offset + 5, firstLen),
+          byteAt(offset + 6, firstLen),
+          byteAt(offset + 7, firstLen))
+      }
+    }
+
+    private[pekko] override def readLongLEUnchecked(offset: Int): Long = {
+      val firstLen = firstLength
+      if (offset + java.lang.Long.BYTES <= firstLen) 
first.readLongLEUnchecked(offset)
+      else if (offset >= firstLen) second.readLongLEUnchecked(offset - 
firstLen)
+      else {
+        SWARUtil.getLongLE(
+          byteAt(offset, firstLen),
+          byteAt(offset + 1, firstLen),
+          byteAt(offset + 2, firstLen),
+          byteAt(offset + 3, firstLen),
+          byteAt(offset + 4, firstLen),
+          byteAt(offset + 5, firstLen),
+          byteAt(offset + 6, firstLen),
+          byteAt(offset + 7, firstLen))
+      }
+    }
+
+    private[pekko] def addFragmentsTo(builder: VectorBuilder[ByteString1]): 
Unit = {
+      ByteStrings.addFragments(builder, first)
+      ByteStrings.addFragments(builder, second)
+    }
+
+    protected def writeReplace(): AnyRef = new SerializationProxy(this)
+  }
+
   /**
    * A ByteString with 2 or more fragments.
    */
@@ -980,6 +1462,7 @@ object ByteString {
         that match {
           case b: ByteString1C => ByteStrings(this, b.toByteString1)
           case b: ByteString1  => ByteStrings(this, b)
+          case bs: ByteString2 => ByteStrings(this, bs)
           case bs: ByteStrings => ByteStrings(this, bs)
         }
     }
@@ -1362,7 +1845,7 @@ object ByteString {
   }
 
   private[pekko] object Companion {
-    private val companionMap = Seq(ByteString1, ByteString1C, ByteStrings)
+    private val companionMap = Seq(ByteString1, ByteString1C, ByteStrings, 
ByteString2)
       .map(x => x.SerializationIdentity -> x)
       .toMap
       .withDefault(x => throw new IllegalArgumentException("Invalid 
serialization id " + x))
@@ -1804,6 +2287,23 @@ sealed abstract class ByteString
     }
   }
 
+  /**
+   * INTERNAL API: compare `len` bytes from this ByteString starting at 
`haystackOffset`
+   * against `needle[needleOffset..needleOffset+len)`.
+   * Overridden by ByteString1C and ByteString1 with SWAR-based 8-byte 
comparisons.
+   */
+  private[pekko] def matchesAt(haystackOffset: Int, needle: Array[Byte], 
needleOffset: Int, len: Int): Boolean = {
+    var hIdx = haystackOffset
+    var nIdx = needleOffset
+    val end = needleOffset + len
+    while (nIdx < end) {
+      if (apply(hIdx) != needle(nIdx)) return false
+      hIdx += 1
+      nIdx += 1
+    }
+    true
+  }
+
   override def grouped(size: Int): Iterator[ByteString] = {
     if (size <= 0) {
       throw new IllegalArgumentException(s"size=$size must be positive")
@@ -2067,6 +2567,13 @@ sealed abstract class ByteString
     readLongLEUnchecked(offset)
   }
 
+  /**
+   * INTERNAL API
+   * Fast byte access for callers that have already checked bounds.
+   */
+  private[pekko] def byteAtUnchecked(offset: Int): Byte =
+    apply(offset)
+
   /**
    * INTERNAL API
    * Optimized in subclasses when we have byte arrays where we can use {@link 
SWARUtil}
@@ -2249,7 +2756,7 @@ sealed abstract class CompactByteString extends 
ByteString with Serializable {
 final class ByteStringBuilder extends Builder[Byte, ByteString] {
   builder =>
 
-  import ByteString.{ ByteString1, ByteString1C, ByteStrings }
+  import ByteString.{ ByteString1, ByteString1C, ByteString2, ByteStrings }
   private var _length: Int = 0
   private val _builder: VectorBuilder[ByteString1] = new 
VectorBuilder[ByteString1]()
   private var _temp: Array[Byte] = _
@@ -2329,6 +2836,9 @@ final class ByteStringBuilder extends Builder[Byte, 
ByteString] {
         case b: ByteString1 =>
           _builder += b
           _length += b.length
+        case b: ByteString2 =>
+          b.addFragmentsTo(_builder)
+          _length += b.length
         case bs: ByteStrings =>
           _builder ++= bs.bytestrings
           _length += bs.length
diff --git a/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala 
b/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
index 7dc9d70d58..76e01c1a34 100644
--- a/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
@@ -357,6 +357,56 @@ private[pekko] object SWARUtil {
   private[pekko] def getShortLEWithoutMethodHandle(array: Array[Byte], index: 
Int): Short =
     ((array(index) & 0xFF) | (array(index + 1) & 0xFF) << 8).toShort
 
+  private[pekko] def getShortBE(b0: Byte, b1: Byte): Short = ((b0 & 0xFF) << 8 
| (b1 & 0xFF)).toShort
+
+  private[pekko] def getShortLE(b0: Byte, b1: Byte): Short = ((b0 & 0xFF) | 
(b1 & 0xFF) << 8).toShort
+
+  private[pekko] def getIntBE(b0: Byte, b1: Byte, b2: Byte, b3: Byte): Int = 
(b0 & 0xFF) << 24 |
+    (b1 & 0xFF) << 16 |
+    (b2 & 0xFF) << 8 |
+    (b3 & 0xFF)
+
+  private[pekko] def getIntLE(b0: Byte, b1: Byte, b2: Byte, b3: Byte): Int = 
(b0 & 0xFF) |
+    (b1 & 0xFF) << 8 |
+    (b2 & 0xFF) << 16 |
+    (b3 & 0xFF) << 24
+
+  private[pekko] def getLongBE(
+      b0: Byte,
+      b1: Byte,
+      b2: Byte,
+      b3: Byte,
+      b4: Byte,
+      b5: Byte,
+      b6: Byte,
+      b7: Byte): Long =
+    (b0.toLong & 0xFFL) << 56 |
+    (b1.toLong & 0xFFL) << 48 |
+    (b2.toLong & 0xFFL) << 40 |
+    (b3.toLong & 0xFFL) << 32 |
+    (b4.toLong & 0xFFL) << 24 |
+    (b5.toLong & 0xFFL) << 16 |
+    (b6.toLong & 0xFFL) << 8 |
+    (b7.toLong & 0xFFL)
+
+  private[pekko] def getLongLE(
+      b0: Byte,
+      b1: Byte,
+      b2: Byte,
+      b3: Byte,
+      b4: Byte,
+      b5: Byte,
+      b6: Byte,
+      b7: Byte): Long =
+    (b0.toLong & 0xFFL) |
+    (b1.toLong & 0xFFL) << 8 |
+    (b2.toLong & 0xFFL) << 16 |
+    (b3.toLong & 0xFFL) << 24 |
+    (b4.toLong & 0xFFL) << 32 |
+    (b5.toLong & 0xFFL) << 40 |
+    (b6.toLong & 0xFFL) << 48 |
+    (b7.toLong & 0xFFL) << 56
+
   private[pekko] def putIntBEWithoutMethodHandle(array: Array[Byte], index: 
Int, value: Int): Unit = {
     array(index) = (value >>> 24).toByte
     array(index + 1) = (value >>> 16).toByte
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
index 23e2327271..69262bc48d 100644
--- 
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
@@ -13,6 +13,7 @@
 
 package org.apache.pekko.util
 
+import java.nio.ByteBuffer
 import java.util.concurrent.TimeUnit
 
 import org.openjdk.jmh.annotations._
@@ -27,6 +28,15 @@ import org.openjdk.jmh.infra.Blackhole
 class ByteString_append_Benchmark {
 
   private val bs = ByteString(Array.ofDim[Byte](10))
+  private val header = ByteString.fromArrayUnsafe(Array.tabulate[Byte](9)(i => 
(i + 1).toByte))
+  private val payload = ByteString.fromArrayUnsafe(Array.fill[Byte](4096)(42))
+  private val frame = header ++ payload
+  private val shortHeaderPrefix = ByteString.fromArrayUnsafe(Array[Byte](1, 2))
+  private val headerTailAndPayload = 
ByteString.fromArrayUnsafe(Array.tabulate[Byte](4096 + 7)(i => (i + 3).toByte))
+  private val crossBoundaryFrame = shortHeaderPrefix ++ headerTailAndPayload
+  private val compactFrame = frame.compact
+  private val outputBuffer = ByteBuffer.allocateDirect(9 + 4096)
+  private val outputArray = new Array[Byte](9 + 4096)
 
   @Benchmark
   @OperationsPerInvocation(10000)
@@ -55,6 +65,44 @@ class ByteString_append_Benchmark {
     }
     bh.consume(result)
   }
+
+  @Benchmark
+  def appendTwo(): ByteString =
+    header ++ payload
+
+  @Benchmark
+  def readTwoPartHeader(): Int =
+    frame.readIntBE(0) + frame.readIntBE(4) + frame(8)
+
+  @Benchmark
+  def readTwoPartCrossBoundaryHeader(): Int =
+    crossBoundaryFrame.readIntBE(0) + crossBoundaryFrame.readIntBE(2) + 
crossBoundaryFrame(8)
+
+  @Benchmark
+  def readCompactHeader(): Int =
+    compactFrame.readIntBE(0) + compactFrame.readIntBE(4) + compactFrame(8)
+
+  @Benchmark
+  def appendTwoAndReadHeader(): Int = {
+    val frame = header ++ payload
+    frame.readIntBE(0) + frame.readIntBE(4) + frame(8)
+  }
+
+  @Benchmark
+  def appendTwoAndReadCrossBoundaryHeader(): Int = {
+    val frame = shortHeaderPrefix ++ headerTailAndPayload
+    frame.readIntBE(0) + frame.readIntBE(2) + frame(8)
+  }
+
+  @Benchmark
+  def appendTwoAndCopyToBuffer(): Int = {
+    outputBuffer.clear()
+    (header ++ payload).copyToBuffer(outputBuffer)
+  }
+
+  @Benchmark
+  def appendTwoAndCopyToArray(): Int = (header ++ 
payload).copyToArray(outputArray, 0, outputArray.length)
+
   @Benchmark
   @OperationsPerInvocation(10000)
   def builderOne(bh: Blackhole): Unit = {
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_concat3way_Benchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_concat3way_Benchmark.scala
new file mode 100644
index 0000000000..e4841dc064
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_concat3way_Benchmark.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.util
+
+import java.nio.{ ByteBuffer, ByteOrder }
+import java.util.concurrent.TimeUnit
+
+import org.openjdk.jmh.annotations._
+import org.openjdk.jmh.infra.Blackhole
+
+import org.apache.pekko.util.ByteString.{ ByteString1C, ByteString2 }
+
+/**
+ * Directional benchmark for PR #2924, comparing the two candidate strategies 
for
+ * the gRPC-shaped "5-byte header ++ payload" hot path:
+ *
+ *   - copy: caller assembles the result into a fresh array and wraps with
+ *           `ByteString.fromArrayUnsafe` — yields a contiguous `ByteString1C`.
+ *   - bs2:  `ByteString2.apply(header, payload)` — the new zero-copy 
2-fragment impl.
+ *
+ * For each strategy we time the construction plus every realistic downstream 
op
+ * (parser reads, writer ops, slicing). Sizes cover unary RPC headers (5+64B),
+ * typical RPC bodies (5+1KB), medium streaming chunks (5+16KB) and large 
streaming
+ * chunks (5+64KB, 5+256KB).
+ */
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+class ByteString_concat3way_Benchmark {
+
+  @Param(Array("64", "1024", "16384", "65536", "262144"))
+  var payloadSize: Int = _
+
+  private implicit val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
+
+  // 5-byte gRPC frame header: 1 compressed flag + 4 length BE
+  private val headerArr: Array[Byte] = Array[Byte](0, 0, 0, 0, 0)
+  private var payloadArr: Array[Byte] = _
+  private var header1C: ByteString1C = _
+  private var payload1C: ByteString1C = _
+
+  private var preCopy: ByteString = _
+  private var preBs2: ByteString = _
+
+  private var outBuf: ByteBuffer = _
+
+  @Setup
+  def setup(): Unit = {
+    payloadArr = Array.fill[Byte](payloadSize)(42)
+    header1C = ByteString.fromArrayUnsafe(headerArr).asInstanceOf[ByteString1C]
+    payload1C = 
ByteString.fromArrayUnsafe(payloadArr).asInstanceOf[ByteString1C]
+    preCopy = doCopy()
+    preBs2 = doBs2()
+    outBuf = ByteBuffer.allocate(headerArr.length + payloadSize)
+  }
+
+  // === concat-only (allocation cost) ===
+
+  @Benchmark
+  def concat_copy(): ByteString = doCopy()
+
+  @Benchmark
+  def concat_bs2(): ByteString = doBs2()
+
+  // === concat + parser reads via iterator (naive parser path) ===
+
+  @Benchmark
+  def concatIter_copy(): Int = {
+    val r = doCopy()
+    r.iterator.getInt + r(4).toInt
+  }
+
+  @Benchmark
+  def concatIter_bs2(): Int = {
+    val r = doBs2()
+    r.iterator.getInt + r(4).toInt
+  }
+
+  // === concat + parser reads via readIntBE (efficient parser path) ===
+
+  @Benchmark
+  def concatRead_copy(): Int = {
+    val r = doCopy()
+    r.readIntBE(0) + r(4).toInt
+  }
+
+  @Benchmark
+  def concatRead_bs2(): Int = {
+    val r = doBs2()
+    r.readIntBE(0) + r(4).toInt
+  }
+
+  // === concat + copyToBuffer (Netty/NIO write path) ===
+
+  @Benchmark
+  def concatCopyToBuf_copy(): Int = {
+    outBuf.clear()
+    doCopy().copyToBuffer(outBuf)
+  }
+
+  @Benchmark
+  def concatCopyToBuf_bs2(): Int = {
+    outBuf.clear()
+    doBs2().copyToBuffer(outBuf)
+  }
+
+  // === concat + asByteBuffers (gather I/O write path) ===
+
+  @Benchmark
+  def concatAsByteBuffers_copy(bh: Blackhole): Unit =
+    bh.consume(doCopy().asByteBuffers)
+
+  @Benchmark
+  def concatAsByteBuffers_bs2(bh: Blackhole): Unit =
+    bh.consume(doBs2().asByteBuffers)
+
+  // === concat + drop(5) (strip header) ===
+
+  @Benchmark
+  def concatDrop5_copy(): ByteString = doCopy().drop(5)
+
+  @Benchmark
+  def concatDrop5_bs2(): ByteString = doBs2().drop(5)
+
+  // === concat + take(5) (extract header) ===
+
+  @Benchmark
+  def concatTake5_copy(): ByteString = doCopy().take(5)
+
+  @Benchmark
+  def concatTake5_bs2(): ByteString = doBs2().take(5)
+
+  // === downstream-only on pre-built results ===
+
+  @Benchmark
+  def downIter_copy(): Int = preCopy.iterator.getInt + preCopy(4).toInt
+  @Benchmark
+  def downIter_bs2(): Int = preBs2.iterator.getInt + preBs2(4).toInt
+
+  @Benchmark
+  def downRead_copy(): Int = preCopy.readIntBE(0) + preCopy(4).toInt
+  @Benchmark
+  def downRead_bs2(): Int = preBs2.readIntBE(0) + preBs2(4).toInt
+
+  @Benchmark
+  def downCopyToBuf_copy(bh: Blackhole): Unit = {
+    outBuf.clear(); bh.consume(preCopy.copyToBuffer(outBuf))
+  }
+  @Benchmark
+  def downCopyToBuf_bs2(bh: Blackhole): Unit = {
+    outBuf.clear(); bh.consume(preBs2.copyToBuffer(outBuf))
+  }
+
+  @Benchmark
+  def downAsByteBuffers_copy(bh: Blackhole): Unit = 
bh.consume(preCopy.asByteBuffers)
+  @Benchmark
+  def downAsByteBuffers_bs2(bh: Blackhole): Unit = 
bh.consume(preBs2.asByteBuffers)
+
+  // ---- helpers ----
+
+  private def doCopy(): ByteString = {
+    val merged = new Array[Byte](headerArr.length + payloadSize)
+    System.arraycopy(headerArr, 0, merged, 0, headerArr.length)
+    System.arraycopy(payloadArr, 0, merged, headerArr.length, payloadSize)
+    ByteString.fromArrayUnsafe(merged)
+  }
+
+  private def doBs2(): ByteString =
+    ByteString2.apply(header1C, payload1C)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to