This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 29d74ecb74 Optimize two-part ByteString concatenation (#2924)
29d74ecb74 is described below
commit 29d74ecb74b26bb0ddcf9c0fa7258f489434a585
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat May 9 22:56:35 2026 +0800
Optimize two-part ByteString concatenation (#2924)
* feat: optimize two-part ByteString concatenation
Motivation:
Small ByteString concatenation currently promotes two fragments into
ByteStrings backed by a Vector, which adds allocation and hurts hot paths that
append a header and payload before reading or copying.
Modification:
Add an internal two-fragment ByteString representation for simple non-empty
fragments, flatten it when appending beyond two fragments, and add focused JMH
coverage for append, read, copy, and cross-boundary header reads.
Result:
JMH shows appendTwo allocation dropping from 160 B/op on main to 32 B/op on
this branch, with improved read and copy throughput while ByteStringSpec and
MiMa pass.
* fix: bind ByteString2 serializer
* fix: Address ByteString2 review comments
* fix: tighten ByteString2 invariants
* test: cover ByteString2 serialization paths
* perf: avoid LazyList in ByteString2.iterator + cross-check tests + bench
Motivation:
ByteString2.iterator built MultiByteArrayIterator from `LazyList(...)`. That
allocates two cons cells plus two thunk closures per iterator, and each
call to
MultiByteArrayIterator.normalize() walks the LinearSeq and forces a thunk.
For the gRPC-shaped "5-byte header ++ payload" hot path the iterator+read
overhead dominated, so reading via `iterator.getInt` on the result was
~30x slower than the equivalent `ByteString1C` baseline.
Modification:
* ByteString2.iterator now uses a plain two-element `List` -- same shape
that
MultiByteArrayIterator already accepts (LinearSeq[ByteArrayIterator]) but
with no thunks and half the allocations.
* Add ByteString_concat3way_Benchmark: directional comparison of the two
candidate strategies for the hot path (`copy` via fromArrayUnsafe vs
`bs2` via ByteString2.apply) across realistic gRPC payload sizes
(64B / 1KB / 16KB / 64KB / 256KB), covering concat alone, concat+iterator
reads, concat+readIntBE, copyToBuffer, asByteBuffers, drop(5)/take(5),
and the same operations on pre-built results.
* Add a cross-check test in ByteStringSpec that builds the same logical
bytes
as `ByteString1C` (reference) and as `ByteString2` with every possible
split point, then asserts equivalence across `apply`, full byte-by-byte
iteration, `iterator.getShort/Int/Long` (BE+LE) at every offset,
`readShortBE/LE`, `readIntBE/LE`, `readLongBE/LE`, `take/drop/takeRight/
dropRight/slice`, `copyToBuffer`, `copyToArray`, `indexOf`, `compact`,
equality and hashCode.
Result:
JMH on JDK 25 G1GC, single fork, 5x1s measurement (ops/us, higher is
better):
benchmark size before after speedup
downRead_bs2 64 16.4 730.8 44x
downRead_bs2 1024 27.2 730.8 27x
downRead_bs2 16384 25.7 727.5 28x
downRead_bs2 65536 24.4 731.0 30x
downRead_bs2 262144 18.4 730.6 40x
concatRead_bs2 64 27.2 616.0 23x
concatRead_bs2 1024 16.2 609.5 38x
concatRead_bs2 16384 21.5 608.8 28x
concatRead_bs2 65536 22.1 616.7 28x
concatRead_bs2 262144 25.7 612.8 24x
The previously dominant iterator overhead is gone: ByteString2 read paths
are now within ~1.6x of the contiguous `ByteString1C` baseline (down from
30-50x slower), while concat-and-read is several times faster than the
copy path because we avoid the System.arraycopy. All 212 ByteStringSpec
tests pass including the new cross-check.
* Optimize ByteString2: add copyToBuffer(offset), map, and SWAR-based
startsWith/endsWith (#2950)
- Add private[pekko] matchesAt to abstract ByteString class (default
byte-by-byte impl)
- Override matchesAt in ByteString1C and ByteString1 with SWAR 8-byte
comparisons
- Add optimized copyToBuffer(buffer, offset) to ByteString2 (avoids drop()
allocation)
- Improve startsWith in ByteString2 to use matchesAt (SWAR-based) instead
of byte-by-byte byteAt
- Improve endsWith in ByteString2 to use matchesAt (SWAR-based) instead of
byte-by-byte byteAt
- Add optimized map to ByteString2 (pre-allocated array + byteAtUnchecked
while-loop)
- foreach in ByteString2 already optimal (delegates to first/second
fragments)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/cf258ced-c5f8-4bef-8ec4-3de15df750d3
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: PJ Fanning <[email protected]>
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../PrimitivesSerializationSpec.scala | 34 ++
.../apache/pekko/serialization/SerializeSpec.scala | 4 +
.../org/apache/pekko/util/ByteStringSpec.scala | 203 +++++++-
actor/src/main/resources/reference.conf | 1 +
.../scala/org/apache/pekko/util/ByteString.scala | 536 ++++++++++++++++++++-
.../scala/org/apache/pekko/util/SWARUtil.scala | 50 ++
.../pekko/util/ByteString_append_Benchmark.scala | 48 ++
.../util/ByteString_concat3way_Benchmark.scala | 180 +++++++
8 files changed, 1042 insertions(+), 14 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
index 0fdbdd44d5..b03861dd60 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/serialization/PrimitivesSerializationSpec.scala
@@ -21,6 +21,7 @@ import scala.util.Random
import org.apache.pekko
import pekko.testkit.PekkoSpec
import pekko.util.ByteString
+import pekko.util.ByteString.ByteString2
import com.typesafe.config.ConfigFactory
@@ -61,6 +62,12 @@ class PrimitivesSerializationSpec extends
PekkoSpec(PrimitivesSerializationSpec.
serializer.fromBinary(buffer, "") should ===(msg)
}
+ def expectByteString2(byteString: ByteString): ByteString2 =
+ byteString match {
+ case byteString2: ByteString2 => byteString2
+ case other => fail(s"Expected ByteString2, got
[${other.getClass.getName}]")
+ }
+
"LongSerializer" must {
Seq(0L, 1L, -1L, Long.MinValue, Long.MinValue + 1L, Long.MaxValue,
Long.MaxValue - 1L)
.map(_.asInstanceOf[AnyRef])
@@ -155,9 +162,13 @@ class PrimitivesSerializationSpec extends
PekkoSpec(PrimitivesSerializationSpec.
}
"ByteStringSerializer" must {
+ val twoPartByteString =
+ expectByteString2(ByteString(Array[Byte](1, 2)) ++
ByteString(Array[Byte](3, 4, 5)))
+
Seq(
"empty string" -> ByteString.empty,
"simple content" -> ByteString("hello"),
+ "two-part content" -> twoPartByteString,
"concatenated content" -> (ByteString("hello") ++ ByteString("world")),
"sliced content" -> ByteString("helloabc").take(5),
"large concatenated" ->
@@ -177,6 +188,29 @@ class PrimitivesSerializationSpec extends
PekkoSpec(PrimitivesSerializationSpec.
}
}
+ "serialize ByteString2 as contiguous bytes" in {
+ val serializer = serialization.serializerFor(twoPartByteString.getClass)
+
+ ByteString(serializer.toBinary(twoPartByteString)) should
===(ByteString(Array[Byte](1, 2, 3, 4, 5)))
+ }
+
+ "de-serialize bytes to an equal ByteString" in {
+ val serializer = serialization.serializerFor(twoPartByteString.getClass)
+
+ serializer.fromBinary(Array[Byte](1, 2, 3, 4, 5), None) should
===(twoPartByteString)
+ }
+
+ "serialize and de-serialize ByteString2 using ByteBuffers" in {
+ val serializer =
+
serialization.serializerFor(twoPartByteString.getClass).asInstanceOf[Serializer
with ByteBufferSerializer]
+
+ buffer.clear()
+ serializer.toBinary(twoPartByteString, buffer)
+ buffer.flip()
+
+ serializer.fromBinary(buffer, "") should ===(twoPartByteString)
+ }
+
"have right serializer id" in {
// checking because moved to pekko-actor
serialization.serializerFor(1L.asInstanceOf[AnyRef].getClass).identifier
=== 21
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala
index 22c4321bdd..165d0e09ba 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/serialization/SerializeSpec.scala
@@ -408,6 +408,7 @@ class ReferenceSerializationSpec extends
PekkoSpec(SerializationTests.mostlyRefe
serializerMustBe(classOf[String], classOf[StringSerializer])
serializerMustBe(classOf[ByteString.ByteString1],
classOf[ByteStringSerializer])
serializerMustBe(classOf[ByteString.ByteString1C],
classOf[ByteStringSerializer])
+ serializerMustBe(classOf[ByteString.ByteString2],
classOf[ByteStringSerializer])
serializerMustBe(classOf[ByteString.ByteStrings],
classOf[ByteStringSerializer])
}
@@ -453,6 +454,9 @@ class AllowJavaSerializationSpec extends
PekkoSpec(SerializationTests.allowJavaS
serializerMustBe(classOf[java.lang.Integer], classOf[IntSerializer])
serializerMustBe(classOf[String], classOf[StringSerializer])
serializerMustBe(classOf[ByteString.ByteString1],
classOf[ByteStringSerializer])
+ serializerMustBe(classOf[ByteString.ByteString1C],
classOf[ByteStringSerializer])
+ serializerMustBe(classOf[ByteString.ByteString2],
classOf[ByteStringSerializer])
+ serializerMustBe(classOf[ByteString.ByteStrings],
classOf[ByteStringSerializer])
}
"not support serialization for other classes" in {
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
index c596dcad6e..a5066552d8 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
@@ -30,7 +30,7 @@ import org.scalatestplus.scalacheck.Checkers
import org.apache.pekko
import pekko.io.UnsynchronizedByteArrayInputStream
-import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteStrings }
+import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteString2,
ByteStrings }
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
@@ -106,6 +106,18 @@ class ByteStringSpec extends AnyWordSpec with Matchers
with Checkers {
deserialize(serialize(obj)) == obj
}
+ def expectByteString2(byteString: ByteString): ByteString2 =
+ byteString match {
+ case byteString2: ByteString2 => byteString2
+ case other => fail(s"Expected ByteString2, got
[${other.getClass.getName}]")
+ }
+
+ def expectByteStrings(byteString: ByteString): ByteStrings =
+ byteString match {
+ case byteStrings: ByteStrings => byteStrings
+ case other => fail(s"Expected ByteStrings, got
[${other.getClass.getName}]")
+ }
+
def hexFromSer(obj: AnyRef) = {
val os = new ByteArrayOutputStream
val bos = new ObjectOutputStream(os)
@@ -1749,6 +1761,35 @@ class ByteStringSpec extends AnyWordSpec with Matchers
with Checkers {
"dropping" in { check((a: ByteString, b: ByteString) => (a ++
b).drop(a.size) == b) }
}
+ "preserve concatenation direction" when {
+ "creating ByteString2 from two simple fragments" in {
+ val compact = ByteString1C(Array[Byte](1, 2))
+ val sliced = ByteString1(Array[Byte](0, 3, 4, 5), 1, 2)
+
+ val forward = expectByteString2(compact ++ sliced)
+ forward should ===(ByteString(Array[Byte](1, 2, 3, 4)))
+ forward.asByteBuffers.map(_.remaining()).toSeq should ===(Seq(2, 2))
+
+ val reverse = expectByteString2(sliced ++ compact)
+ reverse should ===(ByteString(Array[Byte](3, 4, 1, 2)))
+ reverse.asByteBuffers.map(_.remaining()).toSeq should ===(Seq(2, 2))
+ }
+
+ "promoting ByteString2 to ByteStrings when prepending or appending
another fragment" in {
+ val middle = expectByteString2(ByteString1C(Array[Byte](2, 3)) ++
ByteString1C(Array[Byte](4, 5)))
+ val prefix = ByteString1C(Array[Byte](1))
+ val suffix = ByteString1C(Array[Byte](6))
+
+ val prepended = expectByteStrings(prefix ++ middle)
+ prepended should ===(ByteString(Array[Byte](1, 2, 3, 4, 5)))
+
+ val appended = expectByteStrings(middle ++ suffix)
+ appended should ===(ByteString(Array[Byte](2, 3, 4, 5, 6)))
+
+ expectByteStrings(prepended ++ suffix) should
===(ByteString(Array[Byte](1, 2, 3, 4, 5, 6)))
+ }
+ }
+
"be equal to the original" when {
"compacting" in {
check { (xs: ByteString) =>
@@ -2038,6 +2079,28 @@ class ByteStringSpec extends AnyWordSpec with Matchers
with Checkers {
deserialize(serialize(bs1)) shouldEqual bs1
deserialize(serialize(bss)) shouldEqual bss
}
+
+ "given ByteString2" in {
+ val original = expectByteString2(ByteString1C(Array[Byte](1, 2, 3)) ++
ByteString1C(Array[Byte](4, 5)))
+
+ deserialize(serialize(original)) match {
+ case deserialized: ByteString2 => deserialized shouldEqual original
+ case other => fail(s"Expected deserialized
ByteString2, got [${other.getClass.getName}]")
+ }
+ }
+
+ "given ByteString2 with sliced fragments" in {
+ val left = ByteString1(Array[Byte](0, 1, 2, 3), 1, 2)
+ val right = ByteString1(Array[Byte](4, 5, 6, 0), 0, 3)
+ val original = expectByteString2(left ++ right)
+
+ deserialize(serialize(original)) match {
+ case deserialized: ByteString2 =>
+ deserialized shouldEqual ByteString(Array[Byte](1, 2, 4, 5, 6))
+ deserialized.asByteBuffers.map(_.remaining()).toSeq should
===(Seq(2, 3))
+ case other => fail(s"Expected deserialized ByteString2, got
[${other.getClass.getName}]")
+ }
+ }
}
"unsafely wrap and unwrap bytes" in {
@@ -2122,6 +2185,18 @@ class ByteStringSpec extends AnyWordSpec with Matchers
with Checkers {
byteStrings.readLongLE(8) should ===(0x100F0E0D0C0B0A09L)
}
+ "read primitive values across ByteString2 fragment boundaries" in {
+ val byteString2 =
+ expectByteString2(ByteString1C(Array[Byte](1, 2, 3)) ++
ByteString1C(Array[Byte](4, 5, 6, 7, 8)))
+
+ byteString2.readShortBE(2) should ===(0x0304.toShort)
+ byteString2.readShortLE(2) should ===(0x0403.toShort)
+ byteString2.readIntBE(1) should ===(0x02030405)
+ byteString2.readIntLE(1) should ===(0x05040302)
+ byteString2.readLongBE(0) should ===(0x0102030405060708L)
+ byteString2.readLongLE(0) should ===(0x0807060504030201L)
+ }
+
"throw IndexOutOfBoundsException for readShortBE/LE with insufficient
data" in {
val bs1C = ByteString1C(Array[Byte](1))
an[IndexOutOfBoundsException] should be thrownBy bs1C.readShortBE(0)
@@ -2343,6 +2418,132 @@ class ByteStringSpec extends AnyWordSpec with Matchers
with Checkers {
noException should be thrownBy builder.sizeHint(0)
builder.result() should ===(ByteString(42.toByte))
}
+
+ "ByteString2 must behave exactly like ByteString1C across every
read/transform op" in {
+ // Build identical bytes as ByteString1C (reference) and as ByteString2
with several split points;
+ // every public read or transform op MUST return the same value. This
guards against regressions
+ // in the iterator/normalize path (e.g. the LazyList -> List fix on
ByteString2.iterator).
+ val totalSize = 23 // odd, large enough to exercise short/int/long reads
at every fragment boundary
+ val bytes = Array.tabulate[Byte](totalSize)(i => ((i * 31 + 7) &
0xFF).toByte)
+ val reference = ByteString1C(bytes)
+
+ // Try every possible non-empty split: bs2 has fragments [0, split) ++
[split, totalSize)
+ for (split <- 1 until totalSize) {
+ val left = ByteString1C(java.util.Arrays.copyOfRange(bytes, 0, split))
+ val right = ByteString1C(java.util.Arrays.copyOfRange(bytes, split,
totalSize))
+ val bs2 = expectByteString2(left ++ right)
+
+ withClue(s"split=$split: ") {
+ // basic contract
+ bs2.size should ===(reference.size)
+ bs2 should ===(reference)
+ bs2.hashCode should ===(reference.hashCode)
+ bs2.asByteBuffers.map(_.remaining()).toSeq should ===(Seq(split,
totalSize - split))
+
+ // apply(idx) at every index
+ for (i <- 0 until totalSize) bs2(i) should ===(reference(i))
+
+ // iterator: byte-by-byte equality with reference
+ val refIt = reference.iterator
+ val bs2It = bs2.iterator
+ var idx = 0
+ while (refIt.hasNext) {
+ withClue(s"byte at $idx: ") { bs2It.next() should
===(refIt.next()) }
+ idx += 1
+ }
+ bs2It.hasNext should ===(false)
+
+ // iterator.getShort/getInt/getLong at every valid starting offset,
BE and LE
+ for (off <- 0 to totalSize - 2) {
+ val r = reference.iterator.drop(off)
+ val b = bs2.iterator.drop(off)
+ r.getShort(BIG_ENDIAN) should ===(b.getShort(BIG_ENDIAN))
+
+ val r2 = reference.iterator.drop(off)
+ val b2 = bs2.iterator.drop(off)
+ r2.getShort(LITTLE_ENDIAN) should ===(b2.getShort(LITTLE_ENDIAN))
+ }
+ for (off <- 0 to totalSize - 4) {
+ val r = reference.iterator.drop(off)
+ val b = bs2.iterator.drop(off)
+ r.getInt(BIG_ENDIAN) should ===(b.getInt(BIG_ENDIAN))
+
+ val r2 = reference.iterator.drop(off)
+ val b2 = bs2.iterator.drop(off)
+ r2.getInt(LITTLE_ENDIAN) should ===(b2.getInt(LITTLE_ENDIAN))
+ }
+ for (off <- 0 to totalSize - 8) {
+ val r = reference.iterator.drop(off)
+ val b = bs2.iterator.drop(off)
+ r.getLong(BIG_ENDIAN) should ===(b.getLong(BIG_ENDIAN))
+
+ val r2 = reference.iterator.drop(off)
+ val b2 = bs2.iterator.drop(off)
+ r2.getLong(LITTLE_ENDIAN) should ===(b2.getLong(LITTLE_ENDIAN))
+ }
+
+ // direct readShort/Int/Long at every valid offset, BE and LE
+ for (off <- 0 to totalSize - 2) {
+ bs2.readShortBE(off) should ===(reference.readShortBE(off))
+ bs2.readShortLE(off) should ===(reference.readShortLE(off))
+ }
+ for (off <- 0 to totalSize - 4) {
+ bs2.readIntBE(off) should ===(reference.readIntBE(off))
+ bs2.readIntLE(off) should ===(reference.readIntLE(off))
+ }
+ for (off <- 0 to totalSize - 8) {
+ bs2.readLongBE(off) should ===(reference.readLongBE(off))
+ bs2.readLongLE(off) should ===(reference.readLongLE(off))
+ }
+
+ // take/drop/slice at every cut, including fragment boundary
+ for (n <- 0 to totalSize) {
+ bs2.take(n) should ===(reference.take(n))
+ bs2.drop(n) should ===(reference.drop(n))
+ bs2.takeRight(n) should ===(reference.takeRight(n))
+ bs2.dropRight(n) should ===(reference.dropRight(n))
+ }
+ for {
+ from <- 0 to totalSize
+ until <- from to totalSize
+ } bs2.slice(from, until) should ===(reference.slice(from, until))
+
+ // copyToBuffer
+ val refBuf = ByteBuffer.allocate(totalSize)
+ val bs2Buf = ByteBuffer.allocate(totalSize)
+ val refCopied = reference.copyToBuffer(refBuf)
+ val bs2Copied = bs2.copyToBuffer(bs2Buf)
+ bs2Copied should ===(refCopied)
+ refBuf.flip(); bs2Buf.flip()
+ bs2Buf should ===(refBuf)
+
+ // copyToArray with various (start, len) windows
+ for {
+ start <- 0 to totalSize
+ len <- 0 to totalSize - start
+ } {
+ val refArr = new Array[Byte](totalSize)
+ val bs2Arr = new Array[Byte](totalSize)
+ val nRef = reference.copyToArray(refArr, start, len)
+ val nBs2 = bs2.copyToArray(bs2Arr, start, len)
+ nBs2 should ===(nRef)
+ bs2Arr.toSeq should ===(refArr.toSeq)
+ }
+
+ // indexOf for every distinct byte value present, plus a few absent
+ val present = bytes.toSet
+ val candidates = present ++ Set[Byte](Byte.MinValue, 0.toByte,
Byte.MaxValue)
+ for (b <- candidates) {
+ bs2.indexOf(b) should ===(reference.indexOf(b))
+ for (from <- 0 to totalSize) bs2.indexOf(b, from) should
===(reference.indexOf(b, from))
+ }
+
+ // compact yields equivalent ByteString1C
+ bs2.compact should ===(reference)
+ bs2.compact.isCompact should ===(true)
+ }
+ }
+ }
}
"A ByteStringIterator" must {
diff --git a/actor/src/main/resources/reference.conf
b/actor/src/main/resources/reference.conf
index 5474a38432..5b4add60f3 100644
--- a/actor/src/main/resources/reference.conf
+++ b/actor/src/main/resources/reference.conf
@@ -853,6 +853,7 @@ pekko {
"java.lang.String" = primitive-string
"org.apache.pekko.util.ByteString$ByteString1C" = primitive-bytestring
"org.apache.pekko.util.ByteString$ByteString1" = primitive-bytestring
+ "org.apache.pekko.util.ByteString$ByteString2" = primitive-bytestring
"org.apache.pekko.util.ByteString$ByteStrings" = primitive-bytestring
"java.lang.Long" = primitive-long
"scala.Long" = primitive-long
diff --git a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
index 0d4984a2fa..44db0e80c7 100644
--- a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
@@ -187,6 +187,8 @@ object ByteString {
final class ByteString1C private (private val bytes: Array[Byte]) extends
CompactByteString {
def apply(idx: Int): Byte = bytes(idx)
+ private[pekko] override def byteAtUnchecked(offset: Int): Byte =
bytes(offset)
+
override def length: Int = bytes.length
// Avoid `iterator` in performance sensitive code, call ops directly on
ByteString instead
@@ -217,7 +219,13 @@ object ByteString {
override def ++(that: ByteString): ByteString = {
if (that.isEmpty) this
else if (this.isEmpty) that
- else toByteString1 ++ that
+ else
+ that match {
+ case b: ByteString1C =>
ByteString2.fromNonEmptySimpleFragments(this, b)
+ case b: ByteString1 =>
ByteString2.fromNonEmptySimpleFragments(this, b)
+ case bs: ByteString2 => ByteStrings(this, bs)
+ case bs: ByteStrings => ByteStrings(toByteString1, bs)
+ }
}
override def take(n: Int): ByteString =
@@ -406,7 +414,8 @@ object ByteString {
* INTERNAL API: compare `len` bytes from this ByteString starting at
`haystackOffset`
* against `needle[needleOffset..needleOffset+len)`.
*/
- private[pekko] def matchesAt(haystackOffset: Int, needle: Array[Byte],
needleOffset: Int, len: Int): Boolean = {
+ private[pekko] override def matchesAt(
+ haystackOffset: Int, needle: Array[Byte], needleOffset: Int, len:
Int): Boolean = {
var hIdx = haystackOffset
var nIdx = needleOffset
var remaining = len
@@ -518,6 +527,8 @@ object ByteString {
def apply(idx: Int): Byte = bytes(checkRangeConvert(idx))
+ private[pekko] override def byteAtUnchecked(offset: Int): Byte =
bytes(startIndex + offset)
+
// Avoid `iterator` in performance sensitive code, call ops directly on
ByteString instead
override def iterator: ByteIterator.ByteArrayIterator =
ByteIterator.ByteArrayIterator(bytes, startIndex, startIndex + length)
@@ -633,11 +644,12 @@ object ByteString {
else if (this.isEmpty) that
else
that match {
- case b: ByteString1C => ByteStrings(this, b.toByteString1)
+ case b: ByteString1C =>
ByteString2.fromNonEmptySimpleFragments(this, b)
case b: ByteString1 =>
if ((bytes eq b.bytes) && (startIndex + length == b.startIndex))
new ByteString1(bytes, startIndex, length + b.length)
- else ByteStrings(this, b)
+ else ByteString2.fromNonEmptySimpleFragments(this, b)
+ case bs: ByteString2 => ByteStrings(this, bs)
case bs: ByteStrings => ByteStrings(this, bs)
}
}
@@ -818,7 +830,8 @@ object ByteString {
* INTERNAL API: compare `len` bytes from this ByteString starting at
logical `haystackOffset`
* against `needle[needleOffset..needleOffset+len)`.
*/
- private[pekko] def matchesAt(haystackOffset: Int, needle: Array[Byte],
needleOffset: Int, len: Int): Boolean = {
+ private[pekko] override def matchesAt(
+ haystackOffset: Int, needle: Array[Byte], needleOffset: Int, len:
Int): Boolean = {
var hIdx = startIndex + haystackOffset
var nIdx = needleOffset
var remaining = len
@@ -886,17 +899,35 @@ object ByteString {
private[pekko] object ByteStrings extends Companion {
def apply(bytestrings: Vector[ByteString1]): ByteString =
- new ByteStrings(bytestrings, bytestrings.foldLeft(0)(_ + _.length))
-
- def apply(bytestrings: Vector[ByteString1], length: Int): ByteString = new
ByteStrings(bytestrings, length)
+ apply(bytestrings, bytestrings.foldLeft(0)(_ + _.length))
+
+ def apply(bytestrings: Vector[ByteString1], length: Int): ByteString =
+ bytestrings.length match {
+ case 0 =>
+ assert(length == 0, s"empty ByteStrings length must be 0, was
$length")
+ ByteString.empty
+ case 1 =>
+ val only = bytestrings.head
+ assert(only.length == length, s"single ByteString length
${only.length} did not match $length")
+ only
+ case 2 => ByteString2(bytestrings(0), bytestrings(1), length)
+ case _ => new ByteStrings(bytestrings, length)
+ }
def apply(b1: ByteString1, b2: ByteString1): ByteString = compare(b1, b2)
match {
- case 3 => new ByteStrings(Vector(b1, b2), b1.length + b2.length)
+ case 3 => ByteString2(b1, b2)
case 2 => b2
case 1 => b1
case 0 => ByteString.empty
}
+ def apply(b: ByteString, bs: ByteString2): ByteString = compare(b, bs)
match {
+ case 3 => new ByteStrings(addFragments(b, bs), b.length + bs.length)
+ case 2 => bs
+ case 1 => b
+ case 0 => ByteString.empty
+ }
+
def apply(b: ByteString1, bs: ByteStrings): ByteString = compare(b, bs)
match {
case 3 => new ByteStrings(b +: bs.bytestrings, bs.length + b.length)
case 2 => bs
@@ -904,6 +935,13 @@ object ByteString {
case 0 => ByteString.empty
}
+ def apply(bs: ByteString2, b: ByteString): ByteString = compare(bs, b)
match {
+ case 3 => new ByteStrings(addFragments(bs, b), bs.length + b.length)
+ case 2 => b
+ case 1 => bs
+ case 0 => ByteString.empty
+ }
+
def apply(bs: ByteStrings, b: ByteString1): ByteString = compare(bs, b)
match {
case 3 => new ByteStrings(bs.bytestrings :+ b, bs.length + b.length)
case 2 => b
@@ -911,6 +949,27 @@ object ByteString {
case 0 => ByteString.empty
}
+ def apply(bs: ByteStrings, b: ByteString2): ByteString = compare(bs, b)
match {
+ case 3 => new ByteStrings(addFragments(bs, b), bs.length + b.length)
+ case 2 => b
+ case 1 => bs
+ case 0 => ByteString.empty
+ }
+
+ def apply(b: ByteString2, bs: ByteStrings): ByteString = compare(b, bs)
match {
+ case 3 => new ByteStrings(addFragments(b, bs), b.length + bs.length)
+ case 2 => bs
+ case 1 => b
+ case 0 => ByteString.empty
+ }
+
+ def apply(b1: ByteString2, b2: ByteString2): ByteString = compare(b1, b2)
match {
+ case 3 => new ByteStrings(addFragments(b1, b2), b1.length + b2.length)
+ case 2 => b2
+ case 1 => b1
+ case 0 => ByteString.empty
+ }
+
def apply(bs1: ByteStrings, bs2: ByteStrings): ByteString = compare(bs1,
bs2) match {
case 3 => new ByteStrings(bs1.bytestrings ++ bs2.bytestrings, bs1.length
+ bs2.length)
case 2 => bs2
@@ -925,9 +984,24 @@ object ByteString {
else if (b2.isEmpty) 1
else 3
+ private[ByteString] def addFragments(first: ByteString, second:
ByteString): Vector[ByteString1] = {
+ val builder = new VectorBuilder[ByteString1]
+ addFragments(builder, first)
+ addFragments(builder, second)
+ builder.result()
+ }
+
+ private[ByteString] def addFragments(builder: VectorBuilder[ByteString1],
byteString: ByteString): Unit =
+ byteString match {
+ case b: ByteString1C => builder += b.toByteString1
+ case b: ByteString1 => builder += b
+ case b: ByteString2 => b.addFragmentsTo(builder)
+ case b: ByteStrings => builder ++= b.bytestrings
+ }
+
val SerializationIdentity = 2.toByte
- def readFromInputStream(is: ObjectInputStream): ByteStrings = {
+ def readFromInputStream(is: ObjectInputStream): ByteString = {
val nByteStrings = is.readInt()
val builder = new VectorBuilder[ByteString1]
@@ -942,10 +1016,418 @@ object ByteString {
length += bs.length
i += 1
}
- new ByteStrings(builder.result(), length)
+ ByteStrings(builder.result(), length)
}
}
+ private[pekko] object ByteString2 extends Companion {
+ val SerializationIdentity = 3.toByte
+
+ private[ByteString] def fromNonEmptySimpleFragments(first: ByteString,
second: ByteString): ByteString =
+ fromNonEmptySimpleFragments(first, second, first.length + second.length)
+
+ private[ByteString] def fromNonEmptySimpleFragments(
+ first: ByteString,
+ second: ByteString,
+ length: Int): ByteString =
+ new ByteString2(first, second, length)
+
+ def apply(first: ByteString, second: ByteString): ByteString =
+ apply(first, second, first.length + second.length)
+
+ private[ByteString] def apply(first: ByteString, second: ByteString,
length: Int): ByteString =
+ ByteStrings.compare(first, second) match {
+ case 3 =>
+ if (isSimpleFragment(first) && isSimpleFragment(second))
+ fromNonEmptySimpleFragments(first, second, length)
+ else
+ ByteStrings(ByteStrings.addFragments(first, second), length)
+ case 2 =>
+ assert(second.length == length, s"second ByteString length
${second.length} did not match $length")
+ second
+ case 1 =>
+ assert(first.length == length, s"first ByteString length
${first.length} did not match $length")
+ first
+ case 0 =>
+ assert(length == 0, s"empty ByteString length must be 0, was
$length")
+ ByteString.empty
+ }
+
+ private def isSimpleFragment(byteString: ByteString): Boolean =
+ byteString match {
+ case _: ByteString1C | _: ByteString1 => true
+ case _: ByteString2 | _: ByteStrings => false
+ }
+
+ def readFromInputStream(is: ObjectInputStream): ByteString =
+ ByteString2(ByteString1.readFromInputStream(is),
ByteString1.readFromInputStream(is))
+ }
+
+ /**
+ * A ByteString with exactly two simple fragments.
+ */
+ private[pekko] final class ByteString2 private (
+ private val first: ByteString,
+ private val second: ByteString,
+ val length: Int)
+ extends ByteString
+ with Serializable {
+ assert(ByteString2.isSimpleFragment(first),
+ s"first must be a simple ByteString fragment, was
${first.getClass.getName}")
+ assert(
+ ByteString2.isSimpleFragment(second),
+ s"second must be a simple ByteString fragment, was
${second.getClass.getName}")
+ assert(first.nonEmpty, "first must not be empty")
+ assert(second.nonEmpty, "second must not be empty")
+ assert(
+ first.length + second.length == length,
+ s"ByteString2 length ${first.length + second.length} did not match
$length")
+
+ private[this] val firstLength: Int = first.length
+
+ def apply(idx: Int): Byte =
+ if (0 <= idx && idx < length) {
+ byteAtUnchecked(idx)
+ } else throw new IndexOutOfBoundsException(idx.toString)
+
+ /** Avoid `iterator` in performance sensitive code, call ops directly on
ByteString instead */
+ override def iterator: ByteIterator.MultiByteArrayIterator =
+ ByteIterator.MultiByteArrayIterator(byteArrayIterator(first) ::
byteArrayIterator(second) :: Nil)
+
+ private def byteArrayIterator(byteString: ByteString):
ByteIterator.ByteArrayIterator =
+ byteString match {
+ case b: ByteString1C => b.iterator
+ case b: ByteString1 => b.iterator
+ case _: ByteString2 | _: ByteStrings =>
+ throw new IllegalStateException("ByteString2 fragments must be
compact or sliced ByteStrings")
+ }
+
+ def ++(that: ByteString): ByteString = {
+ if (that.isEmpty) this
+ else if (this.isEmpty) that
+ else
+ that match {
+ case b: ByteString1C => ByteStrings(this, b)
+ case b: ByteString1 => ByteStrings(this, b)
+ case bs: ByteString2 => ByteStrings(this, bs)
+ case bs: ByteStrings => ByteStrings(this, bs)
+ }
+ }
+
+ private[pekko] def byteStringCompanion = ByteString2
+
+ def isCompact: Boolean = false
+
+ override def copyToBuffer(buffer: ByteBuffer): Int = {
+ val written = first.copyToBuffer(buffer)
+ if (buffer.hasRemaining) written + second.copyToBuffer(buffer)
+ else written
+ }
+
+ override def copyToBuffer(buffer: ByteBuffer, offset: Int): Int =
+ if (offset <= 0) {
+ val written = first.copyToBuffer(buffer)
+ if (buffer.hasRemaining) written + second.copyToBuffer(buffer)
+ else written
+ } else if (offset >= firstLength) {
+ second.copyToBuffer(buffer, offset - firstLength)
+ } else {
+ val written = first.copyToBuffer(buffer, offset)
+ if (buffer.hasRemaining) written + second.copyToBuffer(buffer)
+ else written
+ }
+
+ def compact: CompactByteString = {
+ val ar = new Array[Byte](length)
+ first.copyToArray(ar, 0, firstLength)
+ second.copyToArray(ar, firstLength, length - firstLength)
+ ByteString1C(ar)
+ }
+
+ def asByteBuffer: ByteBuffer = compact.asByteBuffer
+
+ def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] =
+ List(first.asByteBuffer, second.asByteBuffer)
+
+ override def asInputStream: InputStream =
+ new SequenceInputStream(Iterator(first.asInputStream,
second.asInputStream).asJavaEnumeration)
+
+ def decodeString(charset: String): String = compact.decodeString(charset)
+
+ def decodeString(charset: Charset): String = compact.decodeString(charset)
+
+ override def decodeBase64: ByteString = compact.decodeBase64
+
+ override def encodeBase64: ByteString = compact.encodeBase64
+
+ private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit = {
+ first.writeToOutputStream(os)
+ second.writeToOutputStream(os)
+ }
+
+ override def take(n: Int): ByteString =
+ if (n <= 0) ByteString.empty
+ else if (n >= length) this
+ else {
+ if (n <= firstLength) first.take(n)
+ else ByteString2(first, second.take(n - firstLength), n)
+ }
+
+ override def dropRight(n: Int): ByteString =
+ if (0 < n && n < length) {
+ val secondLength = length - firstLength
+ if (n < secondLength) ByteString2(first, second.dropRight(n), length -
n)
+ else first.dropRight(n - secondLength)
+ } else if (n >= length) ByteString.empty
+ else this
+
+ override def slice(from: Int, until: Int): ByteString = {
+ val lo = math.max(from, 0)
+ val hi = math.min(until, length)
+ if (lo >= hi) ByteString.empty
+ else if (lo == 0 && hi == length) this
+ else drop(lo).take(hi - lo)
+ }
+
+ override def drop(n: Int): ByteString =
+ if (n <= 0) this
+ else if (n >= length) ByteString.empty
+ else {
+ if (n < firstLength) ByteString2(first.drop(n), second, length - n)
+ else second.drop(n - firstLength)
+ }
+
+ override def indexOf[B >: Byte](elem: B, from: Int): Int =
+ if (from >= length) -1
+ else {
+ val start = math.max(from, 0)
+ if (start < firstLength) {
+ val firstIndex = first.indexOf(elem, start)
+ if (firstIndex >= 0) firstIndex
+ else {
+ val secondIndex = second.indexOf(elem, 0)
+ if (secondIndex >= 0) firstLength + secondIndex else -1
+ }
+ } else {
+ val secondIndex = second.indexOf(elem, start - firstLength)
+ if (secondIndex >= 0) firstLength + secondIndex else -1
+ }
+ }
+
+ override def indexOf(elem: Byte, from: Int): Int =
+ if (from >= length) -1
+ else {
+ val start = math.max(from, 0)
+ if (start < firstLength) {
+ val firstIndex = first.indexOf(elem, start)
+ if (firstIndex >= 0) firstIndex
+ else {
+ val secondIndex = second.indexOf(elem, 0)
+ if (secondIndex >= 0) firstLength + secondIndex else -1
+ }
+ } else {
+ val secondIndex = second.indexOf(elem, start - firstLength)
+ if (secondIndex >= 0) firstLength + secondIndex else -1
+ }
+ }
+
+ override def indexOf(elem: Byte, from: Int, to: Int): Int = {
+ val start = math.max(from, 0)
+ val end = math.min(to, length)
+ if (start >= end) -1
+ else {
+ if (start < firstLength) {
+ val firstIndex = first.indexOf(elem, start, math.min(end,
firstLength))
+ if (firstIndex >= 0) firstIndex
+ else if (end > firstLength) {
+ val secondIndex = second.indexOf(elem, 0, end - firstLength)
+ if (secondIndex >= 0) firstLength + secondIndex else -1
+ } else -1
+ } else {
+ val secondIndex = second.indexOf(elem, start - firstLength, end -
firstLength)
+ if (secondIndex >= 0) firstLength + secondIndex else -1
+ }
+ }
+ }
+
+ override def lastIndexOf[B >: Byte](elem: B, end: Int): Int =
+ if (end < 0) -1
+ else {
+ val cappedEnd = math.min(end, length - 1)
+ if (cappedEnd >= firstLength) {
+ val secondIndex = second.lastIndexOf(elem, cappedEnd - firstLength)
+ if (secondIndex >= 0) firstLength + secondIndex
+ else first.lastIndexOf(elem, firstLength - 1)
+ } else first.lastIndexOf(elem, cappedEnd)
+ }
+
+ override def lastIndexOf(elem: Byte, end: Int): Int =
+ if (end < 0) -1
+ else {
+ val cappedEnd = math.min(end, length - 1)
+ if (cappedEnd >= firstLength) {
+ val secondIndex = second.lastIndexOf(elem, cappedEnd - firstLength)
+ if (secondIndex >= 0) firstLength + secondIndex
+ else first.lastIndexOf(elem, firstLength - 1)
+ } else first.lastIndexOf(elem, cappedEnd)
+ }
+
+ override def copyToArray[B >: Byte](dest: Array[B], start: Int, len: Int):
Int = {
+ val totalToCopy = math.max(0, math.min(math.min(len, length),
dest.length - start))
+ if (totalToCopy > 0) {
+ val firstCopied = first.copyToArray(dest, start, totalToCopy)
+ if (firstCopied < totalToCopy)
+ second.copyToArray(dest, start + firstCopied, totalToCopy -
firstCopied)
+ }
+ totalToCopy
+ }
+
+ override def foreach[@specialized U](f: Byte => U): Unit = {
+ first.foreach(f)
+ second.foreach(f)
+ }
+
+ override def startsWith(bytes: Array[Byte], offset: Int): Boolean = {
+ val needleLen = bytes.length
+ if (length - offset < needleLen) return false
+ if (needleLen == 0) return true
+ val lo = math.max(offset, 0)
+ if (lo >= firstLength) {
+ // range starts entirely in second
+ second.matchesAt(lo - firstLength, bytes, 0, needleLen)
+ } else if (lo + needleLen <= firstLength) {
+ // range ends entirely in first
+ first.matchesAt(lo, bytes, 0, needleLen)
+ } else {
+ // range spans both fragments
+ val firstPart = firstLength - lo
+ first.matchesAt(lo, bytes, 0, firstPart) &&
+ second.matchesAt(0, bytes, firstPart, needleLen - firstPart)
+ }
+ }
+
+ override def endsWith(bytes: Array[Byte]): Boolean = {
+ val needleLen = bytes.length
+ if (length < needleLen) return false
+ if (needleLen == 0) return true
+ val startPos = length - needleLen
+ if (startPos >= firstLength) {
+ // range is entirely in second
+ second.matchesAt(startPos - firstLength, bytes, 0, needleLen)
+ } else {
+ // range spans both fragments
+ val firstPart = firstLength - startPos
+ first.matchesAt(startPos, bytes, 0, firstPart) &&
+ second.matchesAt(0, bytes, firstPart, needleLen - firstPart)
+ }
+ }
+
+ override def map[A](f: Byte => Byte): ByteString = {
+ val result = new Array[Byte](length)
+ val secondLen = length - firstLength
+ var i = 0
+ while (i < firstLength) {
+ result(i) = f(first.byteAtUnchecked(i))
+ i += 1
+ }
+ var j = 0
+ while (j < secondLen) {
+ result(firstLength + j) = f(second.byteAtUnchecked(j))
+ j += 1
+ }
+ ByteString1C(result)
+ }
+
+ private[this] def byteAt(offset: Int, firstLength: Int): Byte =
+ if (offset < firstLength) first.byteAtUnchecked(offset) else
second.byteAtUnchecked(offset - firstLength)
+
+ private[pekko] override def byteAtUnchecked(offset: Int): Byte = {
+ val firstLength = this.firstLength
+ byteAt(offset, firstLength)
+ }
+
+ private[pekko] override def readShortBEUnchecked(offset: Int): Short = {
+ val firstLen = firstLength
+ if (offset + java.lang.Short.BYTES <= firstLen)
first.readShortBEUnchecked(offset)
+ else if (offset >= firstLen) second.readShortBEUnchecked(offset -
firstLen)
+ else SWARUtil.getShortBE(byteAt(offset, firstLen), byteAt(offset + 1,
firstLen))
+ }
+
+ private[pekko] override def readShortLEUnchecked(offset: Int): Short = {
+ val firstLen = firstLength
+ if (offset + java.lang.Short.BYTES <= firstLen)
first.readShortLEUnchecked(offset)
+ else if (offset >= firstLen) second.readShortLEUnchecked(offset -
firstLen)
+ else SWARUtil.getShortLE(byteAt(offset, firstLen), byteAt(offset + 1,
firstLen))
+ }
+
+ private[pekko] override def readIntBEUnchecked(offset: Int): Int = {
+ val firstLen = firstLength
+ if (offset + java.lang.Integer.BYTES <= firstLen)
first.readIntBEUnchecked(offset)
+ else if (offset >= firstLen) second.readIntBEUnchecked(offset - firstLen)
+ else {
+ SWARUtil.getIntBE(
+ byteAt(offset, firstLen),
+ byteAt(offset + 1, firstLen),
+ byteAt(offset + 2, firstLen),
+ byteAt(offset + 3, firstLen))
+ }
+ }
+
+ private[pekko] override def readIntLEUnchecked(offset: Int): Int = {
+ val firstLen = firstLength
+ if (offset + java.lang.Integer.BYTES <= firstLen)
first.readIntLEUnchecked(offset)
+ else if (offset >= firstLen) second.readIntLEUnchecked(offset - firstLen)
+ else {
+ SWARUtil.getIntLE(
+ byteAt(offset, firstLen),
+ byteAt(offset + 1, firstLen),
+ byteAt(offset + 2, firstLen),
+ byteAt(offset + 3, firstLen))
+ }
+ }
+
+ private[pekko] override def readLongBEUnchecked(offset: Int): Long = {
+ val firstLen = firstLength
+ if (offset + java.lang.Long.BYTES <= firstLen)
first.readLongBEUnchecked(offset)
+ else if (offset >= firstLen) second.readLongBEUnchecked(offset -
firstLen)
+ else {
+ SWARUtil.getLongBE(
+ byteAt(offset, firstLen),
+ byteAt(offset + 1, firstLen),
+ byteAt(offset + 2, firstLen),
+ byteAt(offset + 3, firstLen),
+ byteAt(offset + 4, firstLen),
+ byteAt(offset + 5, firstLen),
+ byteAt(offset + 6, firstLen),
+ byteAt(offset + 7, firstLen))
+ }
+ }
+
+ private[pekko] override def readLongLEUnchecked(offset: Int): Long = {
+ val firstLen = firstLength
+ if (offset + java.lang.Long.BYTES <= firstLen)
first.readLongLEUnchecked(offset)
+ else if (offset >= firstLen) second.readLongLEUnchecked(offset -
firstLen)
+ else {
+ SWARUtil.getLongLE(
+ byteAt(offset, firstLen),
+ byteAt(offset + 1, firstLen),
+ byteAt(offset + 2, firstLen),
+ byteAt(offset + 3, firstLen),
+ byteAt(offset + 4, firstLen),
+ byteAt(offset + 5, firstLen),
+ byteAt(offset + 6, firstLen),
+ byteAt(offset + 7, firstLen))
+ }
+ }
+
+ private[pekko] def addFragmentsTo(builder: VectorBuilder[ByteString1]):
Unit = {
+ ByteStrings.addFragments(builder, first)
+ ByteStrings.addFragments(builder, second)
+ }
+
+ protected def writeReplace(): AnyRef = new SerializationProxy(this)
+ }
+
/**
* A ByteString with 2 or more fragments.
*/
@@ -980,6 +1462,7 @@ object ByteString {
that match {
case b: ByteString1C => ByteStrings(this, b.toByteString1)
case b: ByteString1 => ByteStrings(this, b)
+ case bs: ByteString2 => ByteStrings(this, bs)
case bs: ByteStrings => ByteStrings(this, bs)
}
}
@@ -1362,7 +1845,7 @@ object ByteString {
}
private[pekko] object Companion {
- private val companionMap = Seq(ByteString1, ByteString1C, ByteStrings)
+ private val companionMap = Seq(ByteString1, ByteString1C, ByteStrings,
ByteString2)
.map(x => x.SerializationIdentity -> x)
.toMap
.withDefault(x => throw new IllegalArgumentException("Invalid
serialization id " + x))
@@ -1804,6 +2287,23 @@ sealed abstract class ByteString
}
}
+ /**
+ * INTERNAL API: compare `len` bytes from this ByteString starting at
`haystackOffset`
+ * against `needle[needleOffset..needleOffset+len)`.
+ * Overridden by ByteString1C and ByteString1 with SWAR-based 8-byte
comparisons.
+ */
+ private[pekko] def matchesAt(haystackOffset: Int, needle: Array[Byte],
needleOffset: Int, len: Int): Boolean = {
+ var hIdx = haystackOffset
+ var nIdx = needleOffset
+ val end = needleOffset + len
+ while (nIdx < end) {
+ if (apply(hIdx) != needle(nIdx)) return false
+ hIdx += 1
+ nIdx += 1
+ }
+ true
+ }
+
override def grouped(size: Int): Iterator[ByteString] = {
if (size <= 0) {
throw new IllegalArgumentException(s"size=$size must be positive")
@@ -2067,6 +2567,13 @@ sealed abstract class ByteString
readLongLEUnchecked(offset)
}
+ /**
+ * INTERNAL API
+ * Fast byte access for callers that have already checked bounds.
+ */
+ private[pekko] def byteAtUnchecked(offset: Int): Byte =
+ apply(offset)
+
/**
* INTERNAL API
* Optimized in subclasses when we have byte arrays where we can use {@link
SWARUtil}
@@ -2249,7 +2756,7 @@ sealed abstract class CompactByteString extends
ByteString with Serializable {
final class ByteStringBuilder extends Builder[Byte, ByteString] {
builder =>
- import ByteString.{ ByteString1, ByteString1C, ByteStrings }
+ import ByteString.{ ByteString1, ByteString1C, ByteString2, ByteStrings }
private var _length: Int = 0
private val _builder: VectorBuilder[ByteString1] = new
VectorBuilder[ByteString1]()
private var _temp: Array[Byte] = _
@@ -2329,6 +2836,9 @@ final class ByteStringBuilder extends Builder[Byte,
ByteString] {
case b: ByteString1 =>
_builder += b
_length += b.length
+ case b: ByteString2 =>
+ b.addFragmentsTo(_builder)
+ _length += b.length
case bs: ByteStrings =>
_builder ++= bs.bytestrings
_length += bs.length
diff --git a/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
b/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
index 7dc9d70d58..76e01c1a34 100644
--- a/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
@@ -357,6 +357,56 @@ private[pekko] object SWARUtil {
private[pekko] def getShortLEWithoutMethodHandle(array: Array[Byte], index:
Int): Short =
((array(index) & 0xFF) | (array(index + 1) & 0xFF) << 8).toShort
+ private[pekko] def getShortBE(b0: Byte, b1: Byte): Short = ((b0 & 0xFF) << 8
| (b1 & 0xFF)).toShort
+
+ private[pekko] def getShortLE(b0: Byte, b1: Byte): Short = ((b0 & 0xFF) |
(b1 & 0xFF) << 8).toShort
+
+ private[pekko] def getIntBE(b0: Byte, b1: Byte, b2: Byte, b3: Byte): Int =
(b0 & 0xFF) << 24 |
+ (b1 & 0xFF) << 16 |
+ (b2 & 0xFF) << 8 |
+ (b3 & 0xFF)
+
+ private[pekko] def getIntLE(b0: Byte, b1: Byte, b2: Byte, b3: Byte): Int =
(b0 & 0xFF) |
+ (b1 & 0xFF) << 8 |
+ (b2 & 0xFF) << 16 |
+ (b3 & 0xFF) << 24
+
+ private[pekko] def getLongBE(
+ b0: Byte,
+ b1: Byte,
+ b2: Byte,
+ b3: Byte,
+ b4: Byte,
+ b5: Byte,
+ b6: Byte,
+ b7: Byte): Long =
+ (b0.toLong & 0xFFL) << 56 |
+ (b1.toLong & 0xFFL) << 48 |
+ (b2.toLong & 0xFFL) << 40 |
+ (b3.toLong & 0xFFL) << 32 |
+ (b4.toLong & 0xFFL) << 24 |
+ (b5.toLong & 0xFFL) << 16 |
+ (b6.toLong & 0xFFL) << 8 |
+ (b7.toLong & 0xFFL)
+
+ private[pekko] def getLongLE(
+ b0: Byte,
+ b1: Byte,
+ b2: Byte,
+ b3: Byte,
+ b4: Byte,
+ b5: Byte,
+ b6: Byte,
+ b7: Byte): Long =
+ (b0.toLong & 0xFFL) |
+ (b1.toLong & 0xFFL) << 8 |
+ (b2.toLong & 0xFFL) << 16 |
+ (b3.toLong & 0xFFL) << 24 |
+ (b4.toLong & 0xFFL) << 32 |
+ (b5.toLong & 0xFFL) << 40 |
+ (b6.toLong & 0xFFL) << 48 |
+ (b7.toLong & 0xFFL) << 56
+
private[pekko] def putIntBEWithoutMethodHandle(array: Array[Byte], index:
Int, value: Int): Unit = {
array(index) = (value >>> 24).toByte
array(index + 1) = (value >>> 16).toByte
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
index 23e2327271..69262bc48d 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_append_Benchmark.scala
@@ -13,6 +13,7 @@
package org.apache.pekko.util
+import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
@@ -27,6 +28,15 @@ import org.openjdk.jmh.infra.Blackhole
class ByteString_append_Benchmark {
private val bs = ByteString(Array.ofDim[Byte](10))
+ private val header = ByteString.fromArrayUnsafe(Array.tabulate[Byte](9)(i =>
(i + 1).toByte))
+ private val payload = ByteString.fromArrayUnsafe(Array.fill[Byte](4096)(42))
+ private val frame = header ++ payload
+ private val shortHeaderPrefix = ByteString.fromArrayUnsafe(Array[Byte](1, 2))
+ private val headerTailAndPayload =
ByteString.fromArrayUnsafe(Array.tabulate[Byte](4096 + 7)(i => (i + 3).toByte))
+ private val crossBoundaryFrame = shortHeaderPrefix ++ headerTailAndPayload
+ private val compactFrame = frame.compact
+ private val outputBuffer = ByteBuffer.allocateDirect(9 + 4096)
+ private val outputArray = new Array[Byte](9 + 4096)
@Benchmark
@OperationsPerInvocation(10000)
@@ -55,6 +65,44 @@ class ByteString_append_Benchmark {
}
bh.consume(result)
}
+
+ @Benchmark
+ def appendTwo(): ByteString =
+ header ++ payload
+
+ @Benchmark
+ def readTwoPartHeader(): Int =
+ frame.readIntBE(0) + frame.readIntBE(4) + frame(8)
+
+ @Benchmark
+ def readTwoPartCrossBoundaryHeader(): Int =
+ crossBoundaryFrame.readIntBE(0) + crossBoundaryFrame.readIntBE(2) +
crossBoundaryFrame(8)
+
+ @Benchmark
+ def readCompactHeader(): Int =
+ compactFrame.readIntBE(0) + compactFrame.readIntBE(4) + compactFrame(8)
+
+ @Benchmark
+ def appendTwoAndReadHeader(): Int = {
+ val frame = header ++ payload
+ frame.readIntBE(0) + frame.readIntBE(4) + frame(8)
+ }
+
+ @Benchmark
+ def appendTwoAndReadCrossBoundaryHeader(): Int = {
+ val frame = shortHeaderPrefix ++ headerTailAndPayload
+ frame.readIntBE(0) + frame.readIntBE(2) + frame(8)
+ }
+
+ @Benchmark
+ def appendTwoAndCopyToBuffer(): Int = {
+ outputBuffer.clear()
+ (header ++ payload).copyToBuffer(outputBuffer)
+ }
+
+ @Benchmark
+ def appendTwoAndCopyToArray(): Int = (header ++
payload).copyToArray(outputArray, 0, outputArray.length)
+
@Benchmark
@OperationsPerInvocation(10000)
def builderOne(bh: Blackhole): Unit = {
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_concat3way_Benchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_concat3way_Benchmark.scala
new file mode 100644
index 0000000000..e4841dc064
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_concat3way_Benchmark.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.util
+
+import java.nio.{ ByteBuffer, ByteOrder }
+import java.util.concurrent.TimeUnit
+
+import org.openjdk.jmh.annotations._
+import org.openjdk.jmh.infra.Blackhole
+
+import org.apache.pekko.util.ByteString.{ ByteString1C, ByteString2 }
+
+/**
+ * Directional benchmark for PR #2924, comparing the two candidate strategies
for
+ * the gRPC-shaped "5-byte header ++ payload" hot path:
+ *
+ * - copy: caller assembles the result into a fresh array and wraps with
+ * `ByteString.fromArrayUnsafe` — yields a contiguous `ByteString1C`.
+ * - bs2: `ByteString2.apply(header, payload)` — the new zero-copy
2-fragment impl.
+ *
+ * For each strategy we time the construction plus every realistic downstream
op
+ * (parser reads, writer ops, slicing). Sizes cover unary RPC headers (5+64B),
+ * typical RPC bodies (5+1KB), medium streaming chunks (5+16KB) and large
streaming
+ * chunks (5+64KB, 5+256KB).
+ */
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode(Array(Mode.Throughput))
+@Fork(1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+class ByteString_concat3way_Benchmark {
+
+ @Param(Array("64", "1024", "16384", "65536", "262144"))
+ var payloadSize: Int = _
+
+ private implicit val byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
+
+ // 5-byte gRPC frame header: 1 compressed flag + 4 length BE
+ private val headerArr: Array[Byte] = Array[Byte](0, 0, 0, 0, 0)
+ private var payloadArr: Array[Byte] = _
+ private var header1C: ByteString1C = _
+ private var payload1C: ByteString1C = _
+
+ private var preCopy: ByteString = _
+ private var preBs2: ByteString = _
+
+ private var outBuf: ByteBuffer = _
+
+ @Setup
+ def setup(): Unit = {
+ payloadArr = Array.fill[Byte](payloadSize)(42)
+ header1C = ByteString.fromArrayUnsafe(headerArr).asInstanceOf[ByteString1C]
+ payload1C =
ByteString.fromArrayUnsafe(payloadArr).asInstanceOf[ByteString1C]
+ preCopy = doCopy()
+ preBs2 = doBs2()
+ outBuf = ByteBuffer.allocate(headerArr.length + payloadSize)
+ }
+
+ // === concat-only (allocation cost) ===
+
+ @Benchmark
+ def concat_copy(): ByteString = doCopy()
+
+ @Benchmark
+ def concat_bs2(): ByteString = doBs2()
+
+ // === concat + parser reads via iterator (naive parser path) ===
+
+ @Benchmark
+ def concatIter_copy(): Int = {
+ val r = doCopy()
+ r.iterator.getInt + r(4).toInt
+ }
+
+ @Benchmark
+ def concatIter_bs2(): Int = {
+ val r = doBs2()
+ r.iterator.getInt + r(4).toInt
+ }
+
+ // === concat + parser reads via readIntBE (efficient parser path) ===
+
+ @Benchmark
+ def concatRead_copy(): Int = {
+ val r = doCopy()
+ r.readIntBE(0) + r(4).toInt
+ }
+
+ @Benchmark
+ def concatRead_bs2(): Int = {
+ val r = doBs2()
+ r.readIntBE(0) + r(4).toInt
+ }
+
+ // === concat + copyToBuffer (Netty/NIO write path) ===
+
+ @Benchmark
+ def concatCopyToBuf_copy(): Int = {
+ outBuf.clear()
+ doCopy().copyToBuffer(outBuf)
+ }
+
+ @Benchmark
+ def concatCopyToBuf_bs2(): Int = {
+ outBuf.clear()
+ doBs2().copyToBuffer(outBuf)
+ }
+
+ // === concat + asByteBuffers (gather I/O write path) ===
+
+ @Benchmark
+ def concatAsByteBuffers_copy(bh: Blackhole): Unit =
+ bh.consume(doCopy().asByteBuffers)
+
+ @Benchmark
+ def concatAsByteBuffers_bs2(bh: Blackhole): Unit =
+ bh.consume(doBs2().asByteBuffers)
+
+ // === concat + drop(5) (strip header) ===
+
+ @Benchmark
+ def concatDrop5_copy(): ByteString = doCopy().drop(5)
+
+ @Benchmark
+ def concatDrop5_bs2(): ByteString = doBs2().drop(5)
+
+ // === concat + take(5) (extract header) ===
+
+ @Benchmark
+ def concatTake5_copy(): ByteString = doCopy().take(5)
+
+ @Benchmark
+ def concatTake5_bs2(): ByteString = doBs2().take(5)
+
+ // === downstream-only on pre-built results ===
+
+ @Benchmark
+ def downIter_copy(): Int = preCopy.iterator.getInt + preCopy(4).toInt
+ @Benchmark
+ def downIter_bs2(): Int = preBs2.iterator.getInt + preBs2(4).toInt
+
+ @Benchmark
+ def downRead_copy(): Int = preCopy.readIntBE(0) + preCopy(4).toInt
+ @Benchmark
+ def downRead_bs2(): Int = preBs2.readIntBE(0) + preBs2(4).toInt
+
+ @Benchmark
+ def downCopyToBuf_copy(bh: Blackhole): Unit = {
+ outBuf.clear(); bh.consume(preCopy.copyToBuffer(outBuf))
+ }
+ @Benchmark
+ def downCopyToBuf_bs2(bh: Blackhole): Unit = {
+ outBuf.clear(); bh.consume(preBs2.copyToBuffer(outBuf))
+ }
+
+ @Benchmark
+ def downAsByteBuffers_copy(bh: Blackhole): Unit =
bh.consume(preCopy.asByteBuffers)
+ @Benchmark
+ def downAsByteBuffers_bs2(bh: Blackhole): Unit =
bh.consume(preBs2.asByteBuffers)
+
+ // ---- helpers ----
+
+ private def doCopy(): ByteString = {
+ val merged = new Array[Byte](headerArr.length + payloadSize)
+ System.arraycopy(headerArr, 0, merged, 0, headerArr.length)
+ System.arraycopy(payloadArr, 0, merged, headerArr.length, payloadSize)
+ ByteString.fromArrayUnsafe(merged)
+ }
+
+ private def doBs2(): ByteString =
+ ByteString2.apply(header1C, payload1C)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]