This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new ef628ea320 add asInputStream to ByteString (#1085)
ef628ea320 is described below

commit ef628ea3206001a1bd13eb3e68c0ffced9764cc1
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Feb 5 16:09:56 2024 +0100

    add asInputStream to ByteString (#1085)
    
    * add getInputStream to ByteString
    
    * scala 2 updates
    
    * rename as asInputStream
    
    * make methods final
    
    * use different asInputStream implementations on different subclasses
    
    Co-Authored-By: João Ferreira <[email protected]>
    
    * Update ByteBufferBackedInputStream.scala
    
    scalafmt
    
    * remove default asInputStream impl
    
    * add some tests - more needed
    
    * Create bytestring-inputstream.excludes
    
    * scalafmt
    
    * Update bytestring-inputstream.excludes
    
    * Update ByteStringInputStreamSpec.scala
    
    * Update ByteStringInputStreamSpec.scala
    
    * Update ByteStringInputStreamSpec.scala
    
    * add benchmark
    
    * Update ByteString_asInputStream_Benchmark.scala
    
    * change to iterator earlier in chain
    
    ---------
    
    Co-authored-by: João Ferreira <[email protected]>
---
 .../pekko/util/ByteStringInputStreamSpec.scala     | 81 ++++++++++++++++++
 .../bytestring-inputstream.excludes                | 19 +++++
 .../org/apache/pekko/util/ByteString.scala         | 21 ++++-
 .../org/apache/pekko/util/ByteString.scala         | 24 ++++--
 .../scala-3/org/apache/pekko/util/ByteString.scala | 24 ++++--
 bench-jmh/README.md                                |  2 +-
 .../util/ByteString_asInputStream_Benchmark.scala  | 98 ++++++++++++++++++++++
 7 files changed, 256 insertions(+), 13 deletions(-)

diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
new file mode 100644
index 0000000000..b49f4de017
--- /dev/null
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.util
+
+import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
+import java.nio.charset.StandardCharsets
+
+import org.apache.pekko
+import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteStrings }
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class ByteStringInputStreamSpec extends AnyWordSpec with Matchers {
+  "ByteString1" must {
+    "support asInputStream" in {
+      ByteString1.empty.asInputStream.read() shouldEqual -1
+      ByteString1.empty.asInputStream.read(Array.empty) shouldEqual -1
+      toUtf8String(ByteString1.empty.asInputStream) shouldEqual ""
+      toUtf8String(ByteString1.fromString("abc").asInputStream) shouldEqual 
"abc"
+    }
+  }
+  "ByteString1C" must {
+    "support asInputStream" in {
+      toUtf8String(ByteString1C.fromString("").asInputStream) shouldEqual ""
+      toUtf8String(ByteString1C.fromString("abc").asInputStream) shouldEqual 
"abc"
+      val bytes = "abcdef".getBytes(StandardCharsets.US_ASCII)
+      toUtf8String(ByteString.fromArray(bytes, 1, 3).asInputStream) 
shouldEqual "bcd"
+    }
+  }
+  "ByteStrings" must {
+    "support asInputStream" in {
+      val empty = ByteStrings(ByteString1.fromString(""), 
ByteString1.fromString(""))
+      empty.asInputStream.read() shouldEqual -1
+      empty.asInputStream.read(Array.empty) shouldEqual -1
+      toUtf8String(empty.asInputStream) shouldEqual ""
+      val abc = ByteStrings(ByteString1.fromString("a"), 
ByteString1.fromString("bc"))
+      toUtf8String(abc.asInputStream) shouldEqual "abc"
+    }
+  }
+
+  private def toUtf8String(input: InputStream): String =
+    new String(toByteArray(input), StandardCharsets.UTF_8)
+
+  private def toByteArray(input: InputStream): Array[Byte] = {
+    val output = new ByteArrayOutputStream
+    try {
+      copy(input, output)
+      output.toByteArray
+    } finally {
+      output.close()
+    }
+  }
+
+  private def copy(input: InputStream, output: OutputStream): Int = {
+    val buffer = new Array[Byte](4096)
+    var count = 0
+    var n = input.read(buffer)
+    while (n != -1) {
+      output.write(buffer, 0, n)
+      count += n
+      n = input.read(buffer)
+    }
+    count
+  }
+}
diff --git 
a/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes
 
b/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes
new file mode 100644
index 0000000000..3f071efce5
--- /dev/null
+++ 
b/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Add ByteString.asInputStream
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.util.ByteString.asInputStream")
diff --git a/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala 
b/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala
index 0b06483b38..ac9d6ead76 100644
--- a/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.util
 
-import java.io.{ ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream, 
ObjectOutputStream, SequenceInputStream }
 import java.lang.{ Iterable => JIterable }
 import java.nio.{ ByteBuffer, ByteOrder }
 import java.nio.charset.{ Charset, StandardCharsets }
@@ -21,6 +21,7 @@ import java.util.Base64
 
 import scala.annotation.{ tailrec, varargs }
 import scala.collection.IndexedSeqOptimized
+import scala.collection.JavaConverters._
 import scala.collection.generic.CanBuildFrom
 import scala.collection.immutable
 import scala.collection.immutable.{ IndexedSeq, VectorBuilder }
@@ -271,6 +272,8 @@ object ByteString {
     }
 
     override def toArrayUnsafe(): Array[Byte] = bytes
+
+    override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
   }
 
   /** INTERNAL API: ByteString backed by exactly one array, with start / end 
markers */
@@ -436,6 +439,9 @@ object ByteString {
       if (startIndex == 0 && length == bytes.length) bytes
       else toArray
     }
+
+    override def asInputStream: InputStream =
+      new ByteArrayInputStream(bytes, startIndex, length)
   }
 
   private[pekko] object ByteStrings extends Companion {
@@ -566,6 +572,9 @@ object ByteString {
 
     def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = 
bytestrings.map { _.asByteBuffer }
 
+    override def asInputStream: InputStream =
+      new 
SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration)
+
     def decodeString(charset: String): String = compact.decodeString(charset)
 
     def decodeString(charset: Charset): String = compact.decodeString(charset)
@@ -827,6 +836,15 @@ sealed abstract class ByteString extends IndexedSeq[Byte] 
with IndexedSeqOptimiz
    */
   def toArrayUnsafe(): Array[Byte] = toArray
 
+  /**
+   * Return the bytes in this ByteString as an InputStream.
+   *
+   * @return the bytes in this ByteString accessible as an InputStream
+   * @see [[asByteBuffer]]
+   * @since 1.1.0
+   */
+  def asInputStream: InputStream
+
   override def foreach[@specialized U](f: Byte => U): Unit = 
iterator.foreach(f)
 
   private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit
@@ -886,7 +904,6 @@ sealed abstract class ByteString extends IndexedSeq[Byte] 
with IndexedSeqOptimiz
    * all fragments. Will always have at least one entry.
    */
   def getByteBuffers(): JIterable[ByteBuffer] = {
-    import scala.collection.JavaConverters.asJavaIterableConverter
     asByteBuffers.asJava
   }
 
diff --git a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala 
b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
index bf5be5a69a..ca3ee818d8 100644
--- a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala
@@ -13,17 +13,17 @@
 
 package org.apache.pekko.util
 
-import java.io.{ ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream, 
ObjectOutputStream, SequenceInputStream }
 import java.lang.{ Iterable => JIterable }
 import java.nio.{ ByteBuffer, ByteOrder }
 import java.nio.charset.{ Charset, StandardCharsets }
 import java.util.Base64
-import scala.annotation.{ tailrec, varargs }
+import scala.annotation.{ nowarn, tailrec, varargs }
 import scala.collection.{ immutable, mutable }
 import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps, 
StrictOptimizedSeqOps, VectorBuilder }
 import scala.collection.mutable.{ Builder, WrappedArray }
+import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
-import scala.annotation.nowarn
 
 object ByteString {
 
@@ -278,6 +278,7 @@ object ByteString {
 
     override def toArrayUnsafe(): Array[Byte] = bytes
 
+    override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
   }
 
   /** INTERNAL API: ByteString backed by exactly one array, with start / end 
markers */
@@ -448,6 +449,9 @@ object ByteString {
       if (startIndex == 0 && length == bytes.length) bytes
       else toArray
     }
+
+    override def asInputStream: InputStream =
+      new ByteArrayInputStream(bytes, startIndex, length)
   }
 
   private[pekko] object ByteStrings extends Companion {
@@ -578,6 +582,9 @@ object ByteString {
 
     def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = 
bytestrings.map { _.asByteBuffer }
 
+    override def asInputStream: InputStream =
+      new 
SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration)
+
     def decodeString(charset: String): String = compact.decodeString(charset)
 
     def decodeString(charset: Charset): String = compact.decodeString(charset)
@@ -876,6 +883,15 @@ sealed abstract class ByteString
    */
   def toArrayUnsafe(): Array[Byte] = toArray
 
+  /**
+   * Return the bytes in this ByteString as an InputStream.
+   *
+   * @return the bytes in this ByteString accessible as an InputStream
+   * @see [[asByteBuffer]]
+   * @since 1.1.0
+   */
+  def asInputStream: InputStream
+
   override def foreach[@specialized U](f: Byte => U): Unit = 
iterator.foreach(f)
 
   private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit
@@ -931,9 +947,7 @@ sealed abstract class ByteString
    * Java API: Returns an Iterable of read-only ByteBuffers that directly 
wraps this ByteStrings
    * all fragments. Will always have at least one entry.
    */
-  @nowarn
   def getByteBuffers(): JIterable[ByteBuffer] = {
-    import scala.collection.JavaConverters.asJavaIterableConverter
     asByteBuffers.asJava
   }
 
diff --git a/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala 
b/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala
index bcd4c3284c..e8e64af848 100644
--- a/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.util
 
-import java.io.{ ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream, 
ObjectOutputStream, SequenceInputStream }
 import java.lang.{ Iterable => JIterable }
 import java.nio.{ ByteBuffer, ByteOrder }
 import java.nio.charset.{ Charset, StandardCharsets }
@@ -23,10 +23,9 @@ import scala.annotation.{ tailrec, varargs }
 import scala.collection.{ immutable, mutable }
 import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps, 
StrictOptimizedSeqOps, VectorBuilder }
 import scala.collection.mutable.{ Builder, WrappedArray }
+import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
-import scala.annotation.nowarn
-
 object ByteString {
 
   /**
@@ -279,6 +278,8 @@ object ByteString {
     }
 
     override def toArrayUnsafe(): Array[Byte] = bytes
+
+    override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
   }
 
   /** INTERNAL API: ByteString backed by exactly one array, with start / end 
markers */
@@ -449,6 +450,9 @@ object ByteString {
       if (startIndex == 0 && length == bytes.length) bytes
       else toArray
     }
+
+    override def asInputStream: InputStream =
+      new ByteArrayInputStream(bytes, startIndex, length)
   }
 
   private[pekko] object ByteStrings extends Companion {
@@ -579,6 +583,9 @@ object ByteString {
 
     def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = 
bytestrings.map { _.asByteBuffer }
 
+    override def asInputStream: InputStream =
+      new 
SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration)
+
     def decodeString(charset: String): String = compact.decodeString(charset)
 
     def decodeString(charset: Charset): String = compact.decodeString(charset)
@@ -876,6 +883,15 @@ sealed abstract class ByteString
    */
   def toArrayUnsafe(): Array[Byte] = toArray
 
+  /**
+   * Return the bytes in this ByteString as an InputStream.
+   *
+   * @return the bytes in this ByteString accessible as an InputStream
+   * @see [[asByteBuffer]]
+   * @since 1.1.0
+   */
+  def asInputStream: InputStream
+
   override def foreach[@specialized U](f: Byte => U): Unit = 
iterator.foreach(f)
 
   private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit
@@ -931,9 +947,7 @@ sealed abstract class ByteString
    * Java API: Returns an Iterable of read-only ByteBuffers that directly 
wraps this ByteStrings
    * all fragments. Will always have at least one entry.
    */
-  @nowarn
   def getByteBuffers(): JIterable[ByteBuffer] = {
-    import scala.collection.JavaConverters.asJavaIterableConverter
     asByteBuffers.asJava
   }
 
diff --git a/bench-jmh/README.md b/bench-jmh/README.md
index df4709fcaf..08aa061eb7 100644
--- a/bench-jmh/README.md
+++ b/bench-jmh/README.md
@@ -9,7 +9,7 @@ Pekko uses [sbt-jmh](https://github.com/sbt/sbt-jmh) to 
integrate [Java Microben
 ```shell
 sbt shell
 pekko > project bench-jmh
-sbt:bench-jmh> Jmh/run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
+sbt:pekko-bench-jmh> Jmh/run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
 ```
 
 or execute in one-line command
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
new file mode 100644
index 0000000000..36fb2b9e2d
--- /dev/null
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.util
+
+import java.io.{ ByteArrayInputStream, InputStream }
+import java.util.concurrent.TimeUnit
+
+import org.openjdk.jmh.annotations._
+import org.openjdk.jmh.infra.Blackhole
+
+/**
+ * Compares ByteString.asInputStream and new 
ByteStreamArray(ByteString.toArray).
+ */
+@State(Scope.Benchmark)
+@Measurement(timeUnit = TimeUnit.MILLISECONDS)
+class ByteString_asInputStream_Benchmark {
+
+  var bs: ByteString = _
+
+  var composed: ByteString = _
+
+  @Param(Array("10", "100", "1000"))
+  var kb = 0
+
+  /*
+    bench-jmh/jmh:run -f 1 -wi 3 -i 3 .*ByteString_asInputStream_Benchmark.*
+
+    [info] Benchmark                                                           
  (kb)   Mode  Cnt        Score         Error  Units
+    [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream      
    10  thrpt    3    32398.229 ±   26714.266  ops/s
+    [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream      
   100  thrpt    3     3642.487 ±     576.459  ops/s
+    [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream      
  1000  thrpt    3      285.910 ±      40.463  ops/s
+    [info] 
ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream    10  
thrpt    3     6182.509 ±     933.899  ops/s
+    [info] 
ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream   100  
thrpt    3      474.634 ±      84.763  ops/s
+    [info] 
ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream  1000  
thrpt    3       38.764 ±      49.698  ops/s
+    [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream        
    10  thrpt    3  2436952.866 ± 1253216.244  ops/s
+    [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream        
   100  thrpt    3   339116.689 ±  297756.892  ops/s
+    [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream        
  1000  thrpt    3    32592.451 ±   12465.507  ops/s
+    [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream  
    10  thrpt    3   619077.237 ±  200242.708  ops/s
+    [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream  
   100  thrpt    3    50481.984 ±   78485.741  ops/s
+    [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream  
  1000  thrpt    3     4271.984 ±    1061.978  ops/s
+   */
+
+  @Setup
+  def setup(): Unit = {
+    val bytes = Array.ofDim[Byte](1024 * kb)
+    bs = ByteString(bytes)
+    composed = ByteString.empty
+    for (_ <- 0 to 100) {
+      composed = composed ++ bs
+    }
+  }
+
+  @Benchmark
+  def single_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
+    blackhole.consume(countBytes(new ByteArrayInputStream(bs.toArray)))
+  }
+
+  @Benchmark
+  def composed_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
+    blackhole.consume(countBytes(new ByteArrayInputStream(composed.toArray)))
+  }
+
+  @Benchmark
+  def single_bs_as_input_stream(blackhole: Blackhole): Unit = {
+    blackhole.consume(countBytes(bs.asInputStream))
+  }
+
+  @Benchmark
+  def composed_bs_as_input_stream(blackhole: Blackhole): Unit = {
+    blackhole.consume(countBytes(composed.asInputStream))
+  }
+
+  private def countBytes(stream: InputStream): Int = {
+    val buffer = new Array[Byte](1024)
+    var count = 0
+    var read = stream.read(buffer)
+    while (read != -1) {
+      count += read
+      read = stream.read(buffer)
+    }
+    count
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to