This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new ef628ea320 add asInputStream to ByteString (#1085)
ef628ea320 is described below
commit ef628ea3206001a1bd13eb3e68c0ffced9764cc1
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Feb 5 16:09:56 2024 +0100
add asInputStream to ByteString (#1085)
* add getInputStream to ByteString
* scala 2 updates
* rename as asInputStream
* make methods final
* use different asInputStream implementations on different subclasses
Co-Authored-By: João Ferreira <[email protected]>
* Update ByteBufferBackedInputStream.scala
scalafmt
* remove default asInputStream impl
* add some tests - more needed
* Create bytestring-inputstream.excludes
* scalafmt
* Update bytestring-inputstream.excludes
* Update ByteStringInputStreamSpec.scala
* Update ByteStringInputStreamSpec.scala
* Update ByteStringInputStreamSpec.scala
* add benchmark
* Update ByteString_asInputStream_Benchmark.scala
* change to iterator earlier in chain
---------
Co-authored-by: João Ferreira <[email protected]>
---
.../pekko/util/ByteStringInputStreamSpec.scala | 81 ++++++++++++++++++
.../bytestring-inputstream.excludes | 19 +++++
.../org/apache/pekko/util/ByteString.scala | 21 ++++-
.../org/apache/pekko/util/ByteString.scala | 24 ++++--
.../scala-3/org/apache/pekko/util/ByteString.scala | 24 ++++--
bench-jmh/README.md | 2 +-
.../util/ByteString_asInputStream_Benchmark.scala | 98 ++++++++++++++++++++++
7 files changed, 256 insertions(+), 13 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
new file mode 100644
index 0000000000..b49f4de017
--- /dev/null
+++
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.util
+
+import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
+import java.nio.charset.StandardCharsets
+
+import org.apache.pekko
+import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteStrings }
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class ByteStringInputStreamSpec extends AnyWordSpec with Matchers {
+ "ByteString1" must {
+ "support asInputStream" in {
+ ByteString1.empty.asInputStream.read() shouldEqual -1
+ ByteString1.empty.asInputStream.read(Array.empty) shouldEqual -1
+ toUtf8String(ByteString1.empty.asInputStream) shouldEqual ""
+ toUtf8String(ByteString1.fromString("abc").asInputStream) shouldEqual
"abc"
+ }
+ }
+ "ByteString1C" must {
+ "support asInputStream" in {
+ toUtf8String(ByteString1C.fromString("").asInputStream) shouldEqual ""
+ toUtf8String(ByteString1C.fromString("abc").asInputStream) shouldEqual
"abc"
+ val bytes = "abcdef".getBytes(StandardCharsets.US_ASCII)
+ toUtf8String(ByteString.fromArray(bytes, 1, 3).asInputStream)
shouldEqual "bcd"
+ }
+ }
+ "ByteStrings" must {
+ "support asInputStream" in {
+ val empty = ByteStrings(ByteString1.fromString(""),
ByteString1.fromString(""))
+ empty.asInputStream.read() shouldEqual -1
+ empty.asInputStream.read(Array.empty) shouldEqual -1
+ toUtf8String(empty.asInputStream) shouldEqual ""
+ val abc = ByteStrings(ByteString1.fromString("a"),
ByteString1.fromString("bc"))
+ toUtf8String(abc.asInputStream) shouldEqual "abc"
+ }
+ }
+
+ private def toUtf8String(input: InputStream): String =
+ new String(toByteArray(input), StandardCharsets.UTF_8)
+
+ private def toByteArray(input: InputStream): Array[Byte] = {
+ val output = new ByteArrayOutputStream
+ try {
+ copy(input, output)
+ output.toByteArray
+ } finally {
+ output.close()
+ }
+ }
+
+ private def copy(input: InputStream, output: OutputStream): Int = {
+ val buffer = new Array[Byte](4096)
+ var count = 0
+ var n = input.read(buffer)
+ while (n != -1) {
+ output.write(buffer, 0, n)
+ count += n
+ n = input.read(buffer)
+ }
+ count
+ }
+}
diff --git
a/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes
b/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes
new file mode 100644
index 0000000000..3f071efce5
--- /dev/null
+++
b/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add ByteString.asInputStream
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.util.ByteString.asInputStream")
diff --git a/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala
b/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala
index 0b06483b38..ac9d6ead76 100644
--- a/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.util
-import java.io.{ ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream,
ObjectOutputStream, SequenceInputStream }
import java.lang.{ Iterable => JIterable }
import java.nio.{ ByteBuffer, ByteOrder }
import java.nio.charset.{ Charset, StandardCharsets }
@@ -21,6 +21,7 @@ import java.util.Base64
import scala.annotation.{ tailrec, varargs }
import scala.collection.IndexedSeqOptimized
+import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.collection.immutable
import scala.collection.immutable.{ IndexedSeq, VectorBuilder }
@@ -271,6 +272,8 @@ object ByteString {
}
override def toArrayUnsafe(): Array[Byte] = bytes
+
+ override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
}
/** INTERNAL API: ByteString backed by exactly one array, with start / end
markers */
@@ -436,6 +439,9 @@ object ByteString {
if (startIndex == 0 && length == bytes.length) bytes
else toArray
}
+
+ override def asInputStream: InputStream =
+ new ByteArrayInputStream(bytes, startIndex, length)
}
private[pekko] object ByteStrings extends Companion {
@@ -566,6 +572,9 @@ object ByteString {
def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] =
bytestrings.map { _.asByteBuffer }
+ override def asInputStream: InputStream =
+ new
SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration)
+
def decodeString(charset: String): String = compact.decodeString(charset)
def decodeString(charset: Charset): String = compact.decodeString(charset)
@@ -827,6 +836,15 @@ sealed abstract class ByteString extends IndexedSeq[Byte]
with IndexedSeqOptimiz
*/
def toArrayUnsafe(): Array[Byte] = toArray
+ /**
+ * Return the bytes in this ByteString as an InputStream.
+ *
+ * @return the bytes in this ByteString accessible as an InputStream
+ * @see [[asByteBuffer]]
+ * @since 1.1.0
+ */
+ def asInputStream: InputStream
+
override def foreach[@specialized U](f: Byte => U): Unit =
iterator.foreach(f)
private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit
@@ -886,7 +904,6 @@ sealed abstract class ByteString extends IndexedSeq[Byte]
with IndexedSeqOptimiz
* all fragments. Will always have at least one entry.
*/
def getByteBuffers(): JIterable[ByteBuffer] = {
- import scala.collection.JavaConverters.asJavaIterableConverter
asByteBuffers.asJava
}
diff --git a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
index bf5be5a69a..ca3ee818d8 100644
--- a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
@@ -13,17 +13,17 @@
package org.apache.pekko.util
-import java.io.{ ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream,
ObjectOutputStream, SequenceInputStream }
import java.lang.{ Iterable => JIterable }
import java.nio.{ ByteBuffer, ByteOrder }
import java.nio.charset.{ Charset, StandardCharsets }
import java.util.Base64
-import scala.annotation.{ tailrec, varargs }
+import scala.annotation.{ nowarn, tailrec, varargs }
import scala.collection.{ immutable, mutable }
import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps,
StrictOptimizedSeqOps, VectorBuilder }
import scala.collection.mutable.{ Builder, WrappedArray }
+import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
-import scala.annotation.nowarn
object ByteString {
@@ -278,6 +278,7 @@ object ByteString {
override def toArrayUnsafe(): Array[Byte] = bytes
+ override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
}
/** INTERNAL API: ByteString backed by exactly one array, with start / end
markers */
@@ -448,6 +449,9 @@ object ByteString {
if (startIndex == 0 && length == bytes.length) bytes
else toArray
}
+
+ override def asInputStream: InputStream =
+ new ByteArrayInputStream(bytes, startIndex, length)
}
private[pekko] object ByteStrings extends Companion {
@@ -578,6 +582,9 @@ object ByteString {
def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] =
bytestrings.map { _.asByteBuffer }
+ override def asInputStream: InputStream =
+ new
SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration)
+
def decodeString(charset: String): String = compact.decodeString(charset)
def decodeString(charset: Charset): String = compact.decodeString(charset)
@@ -876,6 +883,15 @@ sealed abstract class ByteString
*/
def toArrayUnsafe(): Array[Byte] = toArray
+ /**
+ * Return the bytes in this ByteString as an InputStream.
+ *
+ * @return the bytes in this ByteString accessible as an InputStream
+ * @see [[asByteBuffer]]
+ * @since 1.1.0
+ */
+ def asInputStream: InputStream
+
override def foreach[@specialized U](f: Byte => U): Unit =
iterator.foreach(f)
private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit
@@ -931,9 +947,7 @@ sealed abstract class ByteString
* Java API: Returns an Iterable of read-only ByteBuffers that directly
wraps this ByteStrings
* all fragments. Will always have at least one entry.
*/
- @nowarn
def getByteBuffers(): JIterable[ByteBuffer] = {
- import scala.collection.JavaConverters.asJavaIterableConverter
asByteBuffers.asJava
}
diff --git a/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala
b/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala
index bcd4c3284c..e8e64af848 100644
--- a/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.util
-import java.io.{ ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream,
ObjectOutputStream, SequenceInputStream }
import java.lang.{ Iterable => JIterable }
import java.nio.{ ByteBuffer, ByteOrder }
import java.nio.charset.{ Charset, StandardCharsets }
@@ -23,10 +23,9 @@ import scala.annotation.{ tailrec, varargs }
import scala.collection.{ immutable, mutable }
import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps,
StrictOptimizedSeqOps, VectorBuilder }
import scala.collection.mutable.{ Builder, WrappedArray }
+import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
-import scala.annotation.nowarn
-
object ByteString {
/**
@@ -279,6 +278,8 @@ object ByteString {
}
override def toArrayUnsafe(): Array[Byte] = bytes
+
+ override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
}
/** INTERNAL API: ByteString backed by exactly one array, with start / end
markers */
@@ -449,6 +450,9 @@ object ByteString {
if (startIndex == 0 && length == bytes.length) bytes
else toArray
}
+
+ override def asInputStream: InputStream =
+ new ByteArrayInputStream(bytes, startIndex, length)
}
private[pekko] object ByteStrings extends Companion {
@@ -579,6 +583,9 @@ object ByteString {
def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] =
bytestrings.map { _.asByteBuffer }
+ override def asInputStream: InputStream =
+ new
SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration)
+
def decodeString(charset: String): String = compact.decodeString(charset)
def decodeString(charset: Charset): String = compact.decodeString(charset)
@@ -876,6 +883,15 @@ sealed abstract class ByteString
*/
def toArrayUnsafe(): Array[Byte] = toArray
+ /**
+ * Return the bytes in this ByteString as an InputStream.
+ *
+ * @return the bytes in this ByteString accessible as an InputStream
+ * @see [[asByteBuffer]]
+ * @since 1.1.0
+ */
+ def asInputStream: InputStream
+
override def foreach[@specialized U](f: Byte => U): Unit =
iterator.foreach(f)
private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit
@@ -931,9 +947,7 @@ sealed abstract class ByteString
* Java API: Returns an Iterable of read-only ByteBuffers that directly
wraps this ByteStrings
* all fragments. Will always have at least one entry.
*/
- @nowarn
def getByteBuffers(): JIterable[ByteBuffer] = {
- import scala.collection.JavaConverters.asJavaIterableConverter
asByteBuffers.asJava
}
diff --git a/bench-jmh/README.md b/bench-jmh/README.md
index df4709fcaf..08aa061eb7 100644
--- a/bench-jmh/README.md
+++ b/bench-jmh/README.md
@@ -9,7 +9,7 @@ Pekko uses [sbt-jmh](https://github.com/sbt/sbt-jmh) to
integrate [Java Microben
```shell
sbt shell
pekko > project bench-jmh
-sbt:bench-jmh> Jmh/run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
+sbt:pekko-bench-jmh> Jmh/run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
```
or execute in one-line command
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
new file mode 100644
index 0000000000..36fb2b9e2d
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.util
+
+import java.io.{ ByteArrayInputStream, InputStream }
+import java.util.concurrent.TimeUnit
+
+import org.openjdk.jmh.annotations._
+import org.openjdk.jmh.infra.Blackhole
+
+/**
+ * Compares ByteString.asInputStream and new
ByteStreamArray(ByteString.toArray).
+ */
+@State(Scope.Benchmark)
+@Measurement(timeUnit = TimeUnit.MILLISECONDS)
+class ByteString_asInputStream_Benchmark {
+
+ var bs: ByteString = _
+
+ var composed: ByteString = _
+
+ @Param(Array("10", "100", "1000"))
+ var kb = 0
+
+ /*
+ bench-jmh/jmh:run -f 1 -wi 3 -i 3 .*ByteString_asInputStream_Benchmark.*
+
+ [info] Benchmark
(kb) Mode Cnt Score Error Units
+ [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream
10 thrpt 3 32398.229 ± 26714.266 ops/s
+ [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream
100 thrpt 3 3642.487 ± 576.459 ops/s
+ [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream
1000 thrpt 3 285.910 ± 40.463 ops/s
+ [info]
ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream 10
thrpt 3 6182.509 ± 933.899 ops/s
+ [info]
ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream 100
thrpt 3 474.634 ± 84.763 ops/s
+ [info]
ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream 1000
thrpt 3 38.764 ± 49.698 ops/s
+ [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream
10 thrpt 3 2436952.866 ± 1253216.244 ops/s
+ [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream
100 thrpt 3 339116.689 ± 297756.892 ops/s
+ [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream
1000 thrpt 3 32592.451 ± 12465.507 ops/s
+ [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream
10 thrpt 3 619077.237 ± 200242.708 ops/s
+ [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream
100 thrpt 3 50481.984 ± 78485.741 ops/s
+ [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream
1000 thrpt 3 4271.984 ± 1061.978 ops/s
+ */
+
+ @Setup
+ def setup(): Unit = {
+ val bytes = Array.ofDim[Byte](1024 * kb)
+ bs = ByteString(bytes)
+ composed = ByteString.empty
+ for (_ <- 0 to 100) {
+ composed = composed ++ bs
+ }
+ }
+
+ @Benchmark
+ def single_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
+ blackhole.consume(countBytes(new ByteArrayInputStream(bs.toArray)))
+ }
+
+ @Benchmark
+ def composed_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
+ blackhole.consume(countBytes(new ByteArrayInputStream(composed.toArray)))
+ }
+
+ @Benchmark
+ def single_bs_as_input_stream(blackhole: Blackhole): Unit = {
+ blackhole.consume(countBytes(bs.asInputStream))
+ }
+
+ @Benchmark
+ def composed_bs_as_input_stream(blackhole: Blackhole): Unit = {
+ blackhole.consume(countBytes(composed.asInputStream))
+ }
+
+ private def countBytes(stream: InputStream): Int = {
+ val buffer = new Array[Byte](1024)
+ var count = 0
+ var read = stream.read(buffer)
+ while (read != -1) {
+ count += read
+ read = stream.read(buffer)
+ }
+ count
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]