This is an automated email from the ASF dual-hosted git repository.

engelen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 0964a9b53 AMQP: Avoid allocating a new ByteString on each consumed 
message (#592)
0964a9b53 is described below

commit 0964a9b53177b06e4bf5763994a21288392e0693
Author: Diogo Moura <[email protected]>
AuthorDate: Mon Jun 24 09:24:37 2024 +0100

    AMQP: Avoid allocating a new ByteString on each consumed message (#592)
    
    * Avoid allocating a new ByteString on each consumed message
    
    Changes the ByteString constructor to a `fromArrayUnsafe`. It should be 
safe to use this unsafe method as long as `body` is to exposed to the outside.
    
    * add avoidArrayCopy setting
    
    * Create avoidArrayCopy.backwards.excludes
    
    * support avoidArrayCopy on write cases
    
    * add some testing for avoidArrayCopy
    
    * spelling
    
    * Update AmqpConnectorsSpec.scala
    
    * revert changes in one test
    
    * update tests
    
    * Update AmqpFlowTest.java
    
    * reformat if stmts
    
    * compile issue
    
    * rename param
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
---
 .../avoidArrayCopy.backwards.excludes              |  23 +
 .../connectors/amqp/AmqpConnectorSettings.scala    |  54 +-
 .../impl/AbstractAmqpAsyncFlowStageLogic.scala     |   6 +-
 .../amqp/impl/AmqpReplyToSinkStage.scala           |   6 +-
 .../connectors/amqp/impl/AmqpRpcFlowStage.scala    |  14 +-
 .../connectors/amqp/impl/AmqpSimpleFlowStage.scala |   6 +-
 .../connectors/amqp/impl/AmqpSourceStage.scala     |   9 +-
 .../connectors/amqp/javadsl/AmqpFlowTest.java      |  24 +-
 .../scaladsl/AmqpConnectionProvidersSpec.scala     |   4 +-
 .../amqp/scaladsl/AmqpConnectorsSpec.scala         | 563 +++++++++++----------
 ...AmqpGraphStageLogicConnectionShutdownSpec.scala |   6 +-
 project/Dependencies.scala                         |   3 +-
 12 files changed, 430 insertions(+), 288 deletions(-)

diff --git 
a/amqp/src/main/mima-filters/1.1.x.backward.excludes/avoidArrayCopy.backwards.excludes
 
b/amqp/src/main/mima-filters/1.1.x.backward.excludes/avoidArrayCopy.backwards.excludes
new file mode 100644
index 000000000..641c87ec8
--- /dev/null
+++ 
b/amqp/src/main/mima-filters/1.1.x.backward.excludes/avoidArrayCopy.backwards.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# added reuseByteArray setting
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.AmqpConnectorSettings.reuseByteArray")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.AmqpReplyToSinkSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.AmqpWriteSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.NamedQueueSourceSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.TemporaryQueueSourceSettings.this")
diff --git 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
index 5ffc9e74c..f9278353a 100644
--- 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
+++ 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
@@ -28,6 +28,7 @@ import scala.concurrent.duration._
 sealed trait AmqpConnectorSettings {
   def connectionProvider: AmqpConnectionProvider
   def declarations: immutable.Seq[Declaration]
+  def reuseByteArray: Boolean
 }
 
 sealed trait AmqpSourceSettings extends AmqpConnectorSettings
@@ -40,7 +41,8 @@ final class NamedQueueSourceSettings private (
     val exclusive: Boolean = false,
     val ackRequired: Boolean = true,
     val consumerTag: String = "default",
-    val arguments: Map[String, AnyRef] = Map.empty) extends AmqpSourceSettings 
{
+    val arguments: Map[String, AnyRef] = Map.empty,
+    val reuseByteArray: Boolean = false) extends AmqpSourceSettings {
 
   def withDeclaration(declaration: Declaration): NamedQueueSourceSettings =
     copy(declarations = immutable.Seq(declaration))
@@ -79,12 +81,16 @@ final class NamedQueueSourceSettings private (
   def withArguments(arguments: java.util.Map[String, Object]): 
NamedQueueSourceSettings =
     copy(arguments = arguments.asScala.toMap)
 
+  def withReuseByteArray(reuseByteArray: Boolean): NamedQueueSourceSettings =
+    copy(reuseByteArray = reuseByteArray)
+
   private def copy(declarations: immutable.Seq[Declaration] = declarations,
       noLocal: Boolean = noLocal,
       exclusive: Boolean = exclusive,
       ackRequired: Boolean = ackRequired,
       consumerTag: String = consumerTag,
-      arguments: Map[String, AnyRef] = arguments) =
+      arguments: Map[String, AnyRef] = arguments,
+      reuseByteArray: Boolean = reuseByteArray) =
     new NamedQueueSourceSettings(
       connectionProvider,
       queue,
@@ -93,7 +99,8 @@ final class NamedQueueSourceSettings private (
       exclusive = exclusive,
       ackRequired = ackRequired,
       consumerTag = consumerTag,
-      arguments = arguments)
+      arguments = arguments,
+      reuseByteArray = reuseByteArray)
 
   override def toString: String =
     "NamedQueueSourceSettings(" +
@@ -105,6 +112,7 @@ final class NamedQueueSourceSettings private (
     s"ackRequired=$ackRequired, " +
     s"consumerTag=$consumerTag, " +
     s"arguments=$arguments" +
+    s"reuseByteArray=$reuseByteArray" +
     ")"
 }
 
@@ -123,7 +131,8 @@ final class TemporaryQueueSourceSettings private (
     val connectionProvider: AmqpConnectionProvider,
     val exchange: String,
     val declarations: immutable.Seq[Declaration] = Nil,
-    val routingKey: Option[String] = None) extends AmqpSourceSettings {
+    val routingKey: Option[String] = None,
+    val reuseByteArray: Boolean = false) extends AmqpSourceSettings {
 
   def withDeclaration(declaration: Declaration): TemporaryQueueSourceSettings =
     copy(declarations = immutable.Seq(declaration))
@@ -139,8 +148,14 @@ final class TemporaryQueueSourceSettings private (
 
   def withRoutingKey(routingKey: String): TemporaryQueueSourceSettings = 
copy(routingKey = Some(routingKey))
 
-  private def copy(declarations: immutable.Seq[Declaration] = declarations, 
routingKey: Option[String] = routingKey) =
-    new TemporaryQueueSourceSettings(connectionProvider, exchange, 
declarations = declarations, routingKey = routingKey)
+  def withReuseByteArray(reuseByteArray: Boolean): 
TemporaryQueueSourceSettings =
+    copy(reuseByteArray = reuseByteArray)
+
+  private def copy(declarations: immutable.Seq[Declaration] = declarations,
+      routingKey: Option[String] = routingKey,
+      reuseByteArray: Boolean = reuseByteArray) =
+    new TemporaryQueueSourceSettings(connectionProvider, exchange, 
declarations = declarations,
+      routingKey = routingKey, reuseByteArray = reuseByteArray)
 
   override def toString: String =
     "TemporaryQueueSourceSettings(" +
@@ -148,6 +163,7 @@ final class TemporaryQueueSourceSettings private (
     s"exchange=$exchange, " +
     s"declarations=$declarations, " +
     s"routingKey=$routingKey" +
+    s"reuseByteArray=$reuseByteArray" +
     ")"
 }
 
@@ -164,19 +180,26 @@ object TemporaryQueueSourceSettings {
 
 final class AmqpReplyToSinkSettings private (
     val connectionProvider: AmqpConnectionProvider,
-    val failIfReplyToMissing: Boolean = true) extends AmqpConnectorSettings {
+    val failIfReplyToMissing: Boolean = true,
+    val reuseByteArray: Boolean = false) extends AmqpConnectorSettings {
   override final val declarations = Nil
 
   def withFailIfReplyToMissing(failIfReplyToMissing: Boolean): 
AmqpReplyToSinkSettings =
     copy(failIfReplyToMissing = failIfReplyToMissing)
 
-  private def copy(connectionProvider: AmqpConnectionProvider = 
connectionProvider, failIfReplyToMissing: Boolean) =
-    new AmqpReplyToSinkSettings(connectionProvider, failIfReplyToMissing)
+  def withReuseByteArray(reuseByteArray: Boolean): AmqpReplyToSinkSettings =
+    copy(reuseByteArray = reuseByteArray)
+
+  private def copy(connectionProvider: AmqpConnectionProvider = 
connectionProvider,
+      failIfReplyToMissing: Boolean = failIfReplyToMissing,
+      reuseByteArray: Boolean = reuseByteArray) =
+    new AmqpReplyToSinkSettings(connectionProvider, failIfReplyToMissing, 
reuseByteArray)
 
   override def toString: String =
     "AmqpReplyToSinkSettings(" +
     s"connectionProvider=$connectionProvider, " +
     s"failIfReplyToMissing=$failIfReplyToMissing" +
+    s"reuseByteArray=$reuseByteArray" +
     ")"
 }
 
@@ -197,7 +220,8 @@ final class AmqpWriteSettings private (
     val routingKey: Option[String] = None,
     val declarations: immutable.Seq[Declaration] = Nil,
     val bufferSize: Int = 10,
-    val confirmationTimeout: FiniteDuration = 100.millis) extends 
AmqpConnectorSettings {
+    val confirmationTimeout: FiniteDuration = 100.millis,
+    val reuseByteArray: Boolean = false) extends AmqpConnectorSettings {
 
   def withExchange(exchange: String): AmqpWriteSettings =
     copy(exchange = Some(exchange))
@@ -211,6 +235,9 @@ final class AmqpWriteSettings private (
   def withDeclarations(declarations: immutable.Seq[Declaration]): 
AmqpWriteSettings =
     copy(declarations = declarations)
 
+  def withReuseByteArray(reuseByteArray: Boolean): AmqpWriteSettings =
+    copy(reuseByteArray = reuseByteArray)
+
   /**
    * Java API
    */
@@ -234,8 +261,10 @@ final class AmqpWriteSettings private (
       routingKey: Option[String] = routingKey,
       declarations: immutable.Seq[Declaration] = declarations,
       bufferSize: Int = bufferSize,
-      confirmationTimeout: FiniteDuration = confirmationTimeout) =
-    new AmqpWriteSettings(connectionProvider, exchange, routingKey, 
declarations, bufferSize, confirmationTimeout)
+      confirmationTimeout: FiniteDuration = confirmationTimeout,
+      reuseByteArray: Boolean = reuseByteArray) =
+    new AmqpWriteSettings(connectionProvider, exchange, routingKey, 
declarations, bufferSize,
+      confirmationTimeout, reuseByteArray)
 
   override def toString: String =
     "AmqpSinkSettings(" +
@@ -245,6 +274,7 @@ final class AmqpWriteSettings private (
     s"declarations=$declarations, " +
     s"bufferSize=$bufferSize, " +
     s"confirmationTimeout=$confirmationTimeout" +
+    s"reuseByteArray=$reuseByteArray" +
     ")"
 }
 
diff --git 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
index c8d5ae721..e126289f5 100644
--- 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
+++ 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
@@ -164,13 +164,17 @@ import scala.concurrent.Promise
 
         log.debug("Publishing message {} with deliveryTag {}.", message, tag)
 
+        val bytes = if (settings.reuseByteArray)
+          message.bytes.toArrayUnsafe()
+        else
+          message.bytes.toArray
         channel.basicPublish(
           exchange,
           message.routingKey.getOrElse(routingKey),
           message.mandatory,
           message.immediate,
           message.properties.orNull,
-          message.bytes.toArray)
+          bytes)
 
         tag
       }
diff --git 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
index e012e5e1d..43f2db389 100644
--- 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
+++ 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
@@ -75,13 +75,17 @@ private[amqp] final class 
AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
               val replyTo = elem.properties.flatMap(properties => 
Option(properties.getReplyTo))
 
               if (replyTo.isDefined) {
+                val bytes = if (settings.reuseByteArray)
+                  elem.bytes.toArrayUnsafe()
+                else
+                  elem.bytes.toArray
                 channel.basicPublish(
                   elem.routingKey.getOrElse(""),
                   replyTo.get,
                   elem.mandatory,
                   elem.immediate,
                   elem.properties.orNull,
-                  elem.bytes.toArray)
+                  bytes)
               } else if (settings.failIfReplyToMissing) {
                 onFailure(new RuntimeException("Reply-to header was not set"))
               }
diff --git 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
index 289ff27eb..4c5b453eb 100644
--- 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
+++ 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
@@ -105,7 +105,13 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: 
AmqpWriteSettings, buf
                 body: Array[Byte]): Unit =
               consumerCallback.invoke(
                 new CommittableReadResult {
-                  override val message = ReadResult(ByteString(body), 
envelope, properties)
+                  override val message = {
+                    val byteString = if (settings.reuseByteArray)
+                      ByteString.fromArrayUnsafe(body)
+                    else
+                      ByteString(body)
+                    ReadResult(byteString, envelope, properties)
+                  }
 
                   override def ack(multiple: Boolean): Future[Done] = {
                     val promise = Promise[Done]()
@@ -196,6 +202,10 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: 
AmqpWriteSettings, buf
 
             override def onPush(): Unit = {
               val elem = grab(in)
+              val bytes = if (settings.reuseByteArray)
+                elem.bytes.toArrayUnsafe()
+              else
+                elem.bytes.toArray
               val props = elem.properties.getOrElse(new 
BasicProperties()).builder.replyTo(queueName).build()
               channel.basicPublish(
                 exchange,
@@ -203,7 +213,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: 
AmqpWriteSettings, buf
                 elem.mandatory,
                 elem.immediate,
                 props,
-                elem.bytes.toArray)
+                bytes)
 
               val expectedResponses: Int = {
                 val headers = props.getHeaders
diff --git 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
index 5d24b4447..01333c133 100644
--- 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
+++ 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
@@ -49,13 +49,17 @@ import scala.concurrent.{ Future, Promise }
         override def publish(message: WriteMessage, passThrough: T): Unit = {
           log.debug("Publishing message {}.", message)
 
+          val bytes = if (settings.reuseByteArray)
+            message.bytes.toArrayUnsafe()
+          else
+            message.bytes.toArray
           channel.basicPublish(
             settings.exchange.getOrElse(""),
             message.routingKey.orElse(settings.routingKey).getOrElse(""),
             message.mandatory,
             message.immediate,
             message.properties.orNull,
-            message.bytes.toArray)
+            bytes)
           push(out, (WriteResult.confirmed, passThrough))
         }
       }, streamCompletion.future)
diff --git 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
index 2e950ef95..63ddfb79d 100644
--- 
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
+++ 
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
@@ -93,10 +93,15 @@ private[amqp] final class AmqpSourceStage(settings: 
AmqpSourceSettings, bufferSi
               envelope: Envelope,
               properties: BasicProperties,
               body: Array[Byte]): Unit = {
+            val byteString = if (settings.reuseByteArray)
+              ByteString.fromArrayUnsafe(body)
+            else
+              ByteString(body)
+
             val message = if (ackRequired) {
 
               new CommittableReadResult {
-                override val message = ReadResult(ByteString(body), envelope, 
properties)
+                override val message = ReadResult(byteString, envelope, 
properties)
 
                 override def ack(multiple: Boolean): Future[Done] = {
                   val promise = Promise[Done]()
@@ -110,7 +115,7 @@ private[amqp] final class AmqpSourceStage(settings: 
AmqpSourceSettings, bufferSi
                   promise.future
                 }
               }
-            } else new AutoAckedReadResult(ReadResult(ByteString(body), 
envelope, properties))
+            } else new AutoAckedReadResult(ReadResult(byteString, envelope, 
properties))
             consumerCallback.invoke(message)
           }
 
diff --git 
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
 
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
index f7a673823..06f5e435f 100644
--- 
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
+++ 
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
@@ -19,10 +19,15 @@ import java.util.List;
 import java.util.concurrent.CompletionStage;
 import java.util.stream.Collectors;
 
-import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
+import scala.collection.JavaConverters;
+
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.pekko.Done;
 import org.apache.pekko.actor.ActorSystem;
@@ -32,6 +37,7 @@ import 
org.apache.pekko.stream.connectors.amqp.AmqpWriteSettings;
 import org.apache.pekko.stream.connectors.amqp.QueueDeclaration;
 import org.apache.pekko.stream.connectors.amqp.WriteMessage;
 import org.apache.pekko.stream.connectors.amqp.WriteResult;
+import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
 import org.apache.pekko.stream.javadsl.Flow;
 import org.apache.pekko.stream.javadsl.FlowWithContext;
 import org.apache.pekko.stream.javadsl.Keep;
@@ -39,11 +45,22 @@ import org.apache.pekko.stream.javadsl.Source;
 import org.apache.pekko.stream.testkit.TestSubscriber;
 import org.apache.pekko.stream.testkit.javadsl.TestSink;
 import org.apache.pekko.util.ByteString;
-import scala.collection.JavaConverters;
 
 /** Needs a local running AMQP server on the default port with no password. */
+@RunWith(Parameterized.class)
 public class AmqpFlowTest {
 
+  @Parameters
+  public static Iterable<? extends Object> data() {
+    return Arrays.asList(false, true);
+  }
+
+  /**
+   * This value is initialized with values from data() array
+   */
+  @Parameter
+  public boolean reuseByteArray;
+
   @Rule public final LogCapturingJunit4 logCapturing = new 
LogCapturingJunit4();
 
   private static ActorSystem system;
@@ -53,13 +70,14 @@ public class AmqpFlowTest {
     system = ActorSystem.create();
   }
 
-  private static AmqpWriteSettings settings() {
+  private AmqpWriteSettings settings() {
     final String queueName = "amqp-flow-spec" + System.currentTimeMillis();
     final QueueDeclaration queueDeclaration = 
QueueDeclaration.create(queueName);
 
     return AmqpWriteSettings.create(AmqpLocalConnectionProvider.getInstance())
         .withRoutingKey(queueName)
         .withDeclaration(queueDeclaration)
+        .withReuseByteArray(reuseByteArray)
         .withBufferSize(10)
         .withConfirmationTimeout(Duration.ofMillis(200));
   }
diff --git 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
index 0f6b9ef44..1770cf9de 100644
--- 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
+++ 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
@@ -200,9 +200,9 @@ class AmqpConnectionProvidersSpec extends AmqpSpec {
       val connectionProvider = AmqpDetailsConnectionProvider("localhost", 5673)
       val reusableConnectionProvider = 
AmqpCachedConnectionProvider(connectionProvider).withAutomaticRelease(false)
       try reusableConnectionProvider.get
-      catch { case e: Throwable => e shouldBe an[ConnectException] }
+      catch { case e: Throwable => e shouldBe a[ConnectException] }
       try reusableConnectionProvider.get
-      catch { case e: Throwable => e shouldBe an[ConnectException] }
+      catch { case e: Throwable => e shouldBe a[ConnectException] }
     }
   }
 }
diff --git 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
index a3a73c52e..dad8f3bb7 100644
--- 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
+++ 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
@@ -26,6 +26,7 @@ import pekko.stream.testkit.{ TestPublisher, TestSubscriber }
 import pekko.util.ByteString
 import com.rabbitmq.client.AMQP.BasicProperties
 import com.rabbitmq.client.AuthenticationFailureException
+import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -34,7 +35,7 @@ import scala.collection.immutable
 /**
  * Needs a local running AMQP server on the default port with no password.
  */
-class AmqpConnectorsSpec extends AmqpSpec {
+class AmqpConnectorsSpec extends AmqpSpec with ScalaCheckDrivenPropertyChecks {
 
   override implicit val patienceConfig: PatienceConfig = 
PatienceConfig(10.seconds)
 
@@ -43,78 +44,89 @@ class AmqpConnectorsSpec extends AmqpSpec {
     val connectionProvider = AmqpLocalConnectionProvider
 
     "connection should fail to wrong broker" in assertAllStagesStopped {
-      val connectionProvider = AmqpDetailsConnectionProvider("localhost", 5673)
+      forAll { (reuseByteArray: Boolean) =>
+        val connectionProvider = AmqpDetailsConnectionProvider("localhost", 
5673)
 
-      val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
+        val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
 
-      val amqpSink = AmqpSink.simple(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration))
+        val amqpSink = AmqpSink.simple(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray))
 
-      val input = Vector("one", "two", "three", "four", "five")
-      val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
-      result.failed.futureValue shouldBe an[ConnectException]
+        val input = Vector("one", "two", "three", "four", "five")
+        val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
+        result.failed.futureValue shouldBe a[ConnectException]
+      }
     }
 
     "connection should fail with wrong credentials" in assertAllStagesStopped {
-      val connectionProvider =
-        AmqpDetailsConnectionProvider("invalid", 5673)
-          .withHostsAndPorts(immutable.Seq("localhost" -> 5672))
-          .withCredentials(AmqpCredentials("guest", "guest1"))
-
-      val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
-
-      val amqpSink = AmqpSink.simple(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration))
-
-      val input = Vector("one", "two", "three", "four", "five")
-      val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
-      result.failed.futureValue shouldBe an[AuthenticationFailureException]
+      forAll { (reuseByteArray: Boolean) =>
+        val connectionProvider =
+          AmqpDetailsConnectionProvider("invalid", 5673)
+            .withHostsAndPorts(immutable.Seq("localhost" -> 5672))
+            .withCredentials(AmqpCredentials("guest", "guest1"))
+
+        val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
+
+        val amqpSink = AmqpSink.simple(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray))
+
+        val input = Vector("one", "two", "three", "four", "five")
+        val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
+        result.failed.futureValue shouldBe an[AuthenticationFailureException]
+      }
     }
 
     "publish via RPC which expects 2 responses per message and then consume 
through a simple queue again in the same JVM" in assertAllStagesStopped {
-      val queueName = "amqp-conn-it-spec-rpc-queue-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
-
-      val amqpRpcFlow = AmqpRpcFlow.simple(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration),
-        2)
-
-      val amqpSource = AmqpSource.atMostOnceSource(
-        NamedQueueSourceSettings(connectionProvider, queueName),
-        bufferSize = 1)
-
-      val input = Vector("one", "two", "three", "four", "five")
-      val (rpcQueueF, probe) =
-        Source(input).map(s => 
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
-      rpcQueueF.futureValue
-
-      val amqpSink = AmqpSink.replyTo(
-        AmqpReplyToSinkSettings(connectionProvider))
-
-      val sourceToSink = amqpSource
-        .viaMat(KillSwitches.single)(Keep.right)
-        .mapConcat { b =>
-          List(
-            
WriteMessage(b.bytes.concat(ByteString("a"))).withProperties(b.properties),
-            
WriteMessage(b.bytes.concat(ByteString("aa"))).withProperties(b.properties))
-        }
-        .to(amqpSink)
-        .run()
+      forAll { (reuseByteArray: Boolean) =>
+        val queueName = "amqp-conn-it-spec-rpc-queue-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
+
+        val amqpRpcFlow = AmqpRpcFlow.simple(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray),
+          2)
+
+        val amqpSource = AmqpSource.atMostOnceSource(
+          NamedQueueSourceSettings(connectionProvider, queueName)
+            .withReuseByteArray(reuseByteArray),
+          bufferSize = 1)
+
+        val input = Vector("one", "two", "three", "four", "five")
+        val (rpcQueueF, probe) =
+          Source(input).map(s => 
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
+        rpcQueueF.futureValue
+
+        val amqpSink = AmqpSink.replyTo(
+          AmqpReplyToSinkSettings(connectionProvider)
+            .withReuseByteArray(reuseByteArray))
+
+        val sourceToSink = amqpSource
+          .viaMat(KillSwitches.single)(Keep.right)
+          .mapConcat { b =>
+            List(
+              
WriteMessage(b.bytes.concat(ByteString("a"))).withProperties(b.properties),
+              
WriteMessage(b.bytes.concat(ByteString("aa"))).withProperties(b.properties))
+          }
+          .to(amqpSink)
+          .run()
 
-      probe
-        .request(10)
-        .expectNextUnorderedN(input.flatMap(s => 
List(ByteString(s.concat("a")), ByteString(s.concat("aa")))))
-        .expectComplete()
+        probe
+          .request(10)
+          .expectNextUnorderedN(input.flatMap(s => 
List(ByteString(s.concat("a")), ByteString(s.concat("aa")))))
+          .expectComplete()
 
-      sourceToSink.shutdown()
+        sourceToSink.shutdown()
+      }
     }
 
     "correctly close a AmqpRpcFlow when stream is closed without passing any 
elements" in assertAllStagesStopped {
@@ -163,252 +175,283 @@ class AmqpConnectorsSpec extends AmqpSpec {
           Keep.right)
         .run()
         .futureValue shouldBe Done
-
     }
 
     "publish from one source and consume elements with multiple sinks" in 
assertAllStagesStopped {
-      val queueName = "amqp-conn-it-spec-work-queues-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
-      val amqpSink = AmqpSink.simple(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration))
-
-      val input = Vector("one", "two", "three", "four", "five")
-      Source(input).map(s => ByteString(s)).runWith(amqpSink)
-
-      val mergedSources = Source.fromGraph(GraphDSL.create() { implicit b =>
-        import GraphDSL.Implicits._
-        val count = 3
-        val merge = b.add(Merge[ReadResult](count))
-        for (n <- 0 until count) {
-          val source = b.add(
-            AmqpSource.atMostOnceSource(
-              NamedQueueSourceSettings(connectionProvider, queueName)
-                .withDeclaration(queueDeclaration),
-              bufferSize = 1))
-          source.out ~> merge.in(n)
-        }
-
-        SourceShape(merge.out)
-      })
-
-      val result = 
mergedSources.map(_.bytes.utf8String).take(input.size).runWith(Sink.seq)
-
-      result.futureValue.sorted shouldEqual input.sorted
+      forAll { (reuseByteArray: Boolean) =>
+        val queueName = "amqp-conn-it-spec-work-queues-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
+        val amqpSink = AmqpSink.simple(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray))
+
+        val input = Vector("one", "two", "three", "four", "five")
+        Source(input).map(s => ByteString(s)).runWith(amqpSink)
+
+        val mergedSources = Source.fromGraph(GraphDSL.create() { implicit b =>
+          import GraphDSL.Implicits._
+          val count = 3
+          val merge = b.add(Merge[ReadResult](count))
+          for (n <- 0 until count) {
+            val source = b.add(
+              AmqpSource.atMostOnceSource(
+                NamedQueueSourceSettings(connectionProvider, queueName)
+                  .withDeclaration(queueDeclaration)
+                  .withReuseByteArray(reuseByteArray),
+                bufferSize = 1))
+            source.out ~> merge.in(n)
+          }
+
+          SourceShape(merge.out)
+        })
+
+        val result = 
mergedSources.map(_.bytes.utf8String).take(input.size).runWith(Sink.seq)
+
+        result.futureValue.sorted shouldEqual input.sorted
+      }
     }
 
     "not fail on a fast producer and a slow consumer" in 
assertAllStagesStopped {
-      val queueName = "amqp-conn-it-spec-simple-queue-2-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
-      val amqpSource = AmqpSource.atMostOnceSource(
-        NamedQueueSourceSettings(connectionProvider, 
queueName).withDeclaration(queueDeclaration),
-        bufferSize = 2)
+      forAll { (reuseByteArray: Boolean) =>
+        val queueName = "amqp-conn-it-spec-simple-queue-2-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
+        val amqpSource = AmqpSource.atMostOnceSource(
+          NamedQueueSourceSettings(connectionProvider, queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray),
+          bufferSize = 2)
 
-      val amqpSink = AmqpSink.simple(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration))
+        val amqpSink = AmqpSink.simple(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray))
 
-      val publisher = TestPublisher.probe[ByteString]()
-      val subscriber = TestSubscriber.probe[ReadResult]()
-      
Source.fromPublisher(publisher).to(amqpSink).addAttributes(Attributes.inputBuffer(1,
 1)).run()
-      
amqpSource.to(Sink.fromSubscriber(subscriber)).addAttributes(Attributes.inputBuffer(1,
 1)).run()
+        val publisher = TestPublisher.probe[ByteString]()
+        val subscriber = TestSubscriber.probe[ReadResult]()
+        
Source.fromPublisher(publisher).to(amqpSink).addAttributes(Attributes.inputBuffer(1,
 1)).run()
+        
amqpSource.to(Sink.fromSubscriber(subscriber)).addAttributes(Attributes.inputBuffer(1,
 1)).run()
 
-      // note that this essentially is testing rabbitmq just as much as it 
tests our sink and source
-      publisher.ensureSubscription()
-      subscriber.ensureSubscription()
+        // note that this essentially is testing rabbitmq just as much as it 
tests our sink and source
+        publisher.ensureSubscription()
+        subscriber.ensureSubscription()
 
-      publisher.expectRequest() shouldEqual 1
-      publisher.sendNext(ByteString("one"))
+        publisher.expectRequest() shouldEqual 1
+        publisher.sendNext(ByteString("one"))
 
-      publisher.expectRequest()
-      publisher.sendNext(ByteString("two"))
+        publisher.expectRequest()
+        publisher.sendNext(ByteString("two"))
 
-      publisher.expectRequest()
-      publisher.sendNext(ByteString("three"))
+        publisher.expectRequest()
+        publisher.sendNext(ByteString("three"))
 
-      publisher.expectRequest()
-      publisher.sendNext(ByteString("four"))
+        publisher.expectRequest()
+        publisher.sendNext(ByteString("four"))
 
-      publisher.expectRequest()
-      publisher.sendNext(ByteString("five"))
+        publisher.expectRequest()
+        publisher.sendNext(ByteString("five"))
 
-      subscriber.request(4)
-      subscriber.expectNext().bytes.utf8String shouldEqual "one"
-      subscriber.expectNext().bytes.utf8String shouldEqual "two"
-      subscriber.expectNext().bytes.utf8String shouldEqual "three"
-      subscriber.expectNext().bytes.utf8String shouldEqual "four"
+        subscriber.request(4)
+        subscriber.expectNext().bytes.utf8String shouldEqual "one"
+        subscriber.expectNext().bytes.utf8String shouldEqual "two"
+        subscriber.expectNext().bytes.utf8String shouldEqual "three"
+        subscriber.expectNext().bytes.utf8String shouldEqual "four"
 
-      subscriber.request(1)
-      subscriber.expectNext().bytes.utf8String shouldEqual "five"
+        subscriber.request(1)
+        subscriber.expectNext().bytes.utf8String shouldEqual "five"
 
-      subscriber.cancel()
-      publisher.sendComplete()
-      succeed
+        subscriber.cancel()
+        publisher.sendComplete()
+        succeed
+      }
     }
 
     "keep connection open if downstream closes and there are pending acks" in 
assertAllStagesStopped {
-      val connectionSettings = AmqpDetailsConnectionProvider("localhost", 5672)
+      forAll { (reuseByteArray: Boolean) =>
+        val connectionSettings = AmqpDetailsConnectionProvider("localhost", 
5672)
+
+        val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
+
+        val amqpSink = AmqpSink.simple(
+          AmqpWriteSettings(connectionSettings)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray))
+
+        val amqpSource = AmqpSource.committableSource(
+          NamedQueueSourceSettings(connectionSettings, queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray),
+          bufferSize = 10)
 
-      val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
+        val input = Vector("one", "two", "three", "four", "five")
+        Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue 
shouldEqual Done
 
-      val amqpSink = AmqpSink.simple(
-        AmqpWriteSettings(connectionSettings)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration))
+        val result = amqpSource
+          .take(input.size)
+          .runWith(Sink.seq)
 
-      val amqpSource = AmqpSource.committableSource(
-        NamedQueueSourceSettings(connectionSettings, 
queueName).withDeclaration(queueDeclaration),
-        bufferSize = 10)
+        result.futureValue.map(cm => {
+          noException should be thrownBy cm.ack().futureValue
+        })
+      }
+    }
 
-      val input = Vector("one", "two", "three", "four", "five")
-      Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue 
shouldEqual Done
+    "not republish message without autoAck(false) if nack is sent" in 
assertAllStagesStopped {
+      forAll { (reuseByteArray: Boolean) =>
+        val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
+
+        val amqpSink = AmqpSink.simple(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray))
+        val input = Vector("one", "two", "three", "four", "five")
+        Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue 
shouldEqual Done
+
+        val amqpSource = AmqpSource.committableSource(
+          NamedQueueSourceSettings(connectionProvider, queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray),
+          bufferSize = 10)
 
-      val result = amqpSource
-        .take(input.size)
-        .runWith(Sink.seq)
+        val result1 = amqpSource
+          .mapAsync(1)(cm => cm.nack(requeue = false).map(_ => cm))
+          .take(input.size)
+          .runWith(Sink.seq)
 
-      result.futureValue.map(cm => {
-        noException should be thrownBy cm.ack().futureValue
-      })
-    }
+        Await.ready(result1, 3.seconds)
 
-    "not republish message without autoAck(false) if nack is sent" in 
assertAllStagesStopped {
-      val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
-
-      val amqpSink = AmqpSink.simple(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration))
-      val input = Vector("one", "two", "three", "four", "five")
-      Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue 
shouldEqual Done
-
-      val amqpSource = AmqpSource.committableSource(
-        NamedQueueSourceSettings(connectionProvider, 
queueName).withDeclaration(queueDeclaration),
-        bufferSize = 10)
-
-      val result1 = amqpSource
-        .mapAsync(1)(cm => cm.nack(requeue = false).map(_ => cm))
-        .take(input.size)
-        .runWith(Sink.seq)
-
-      Await.ready(result1, 3.seconds)
-
-      val (sourceToSeq, result2) = amqpSource
-        .viaMat(KillSwitches.single)(Keep.right)
-        .mapAsync(1)(cm => cm.ack().map(_ => cm))
-        .take(input.size)
-        .toMat(Sink.seq)(Keep.both)
-        .run()
+        val (sourceToSeq, result2) = amqpSource
+          .viaMat(KillSwitches.single)(Keep.right)
+          .mapAsync(1)(cm => cm.ack().map(_ => cm))
+          .take(input.size)
+          .toMat(Sink.seq)(Keep.both)
+          .run()
 
-      result2.isReadyWithin(1.second) shouldEqual false
-      sourceToSeq.shutdown()
+        result2.isReadyWithin(1.second) shouldEqual false
+        sourceToSeq.shutdown()
+      }
     }
 
     "publish via RPC and then consume through a simple queue again in the same 
JVM without autoAck" in assertAllStagesStopped {
 
-      val queueName = "amqp-conn-it-spec-rpc-queue-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
+      forAll { (reuseByteArray: Boolean) =>
+        val queueName = "amqp-conn-it-spec-rpc-queue-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
 
-      val input = Vector("one", "two", "three", "four", "five")
+        val input = Vector("one", "two", "three", "four", "five")
 
-      val amqpRpcFlow = AmqpRpcFlow.committableFlow(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration),
-        bufferSize = 10)
-      val (rpcQueueF, probe) =
-        Source(input)
-          .map(s => ByteString(s))
-          .map(bytes => WriteMessage(bytes))
-          .viaMat(amqpRpcFlow)(Keep.right)
-          .mapAsync(1)(cm => cm.ack().map(_ => cm.message))
-          .toMat(TestSink.probe)(Keep.both)
+        val amqpRpcFlow = AmqpRpcFlow.committableFlow(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray),
+          bufferSize = 10)
+        val (rpcQueueF, probe) =
+          Source(input)
+            .map(s => ByteString(s))
+            .map(bytes => WriteMessage(bytes))
+            .viaMat(amqpRpcFlow)(Keep.right)
+            .mapAsync(1)(cm => cm.ack().map(_ => cm.message))
+            .toMat(TestSink.probe)(Keep.both)
+            .run()
+        rpcQueueF.futureValue
+
+        val amqpSink = AmqpSink.replyTo(
+          AmqpReplyToSinkSettings(connectionProvider)
+            .withReuseByteArray(reuseByteArray))
+
+        val amqpSource = AmqpSource.atMostOnceSource(
+          NamedQueueSourceSettings(connectionProvider, queueName)
+            .withReuseByteArray(reuseByteArray),
+          bufferSize = 1)
+        val sourceToSink = amqpSource
+          .viaMat(KillSwitches.single)(Keep.right)
+          .map(b => WriteMessage(b.bytes).withProperties(b.properties))
+          .to(amqpSink)
           .run()
-      rpcQueueF.futureValue
-
-      val amqpSink = AmqpSink.replyTo(
-        AmqpReplyToSinkSettings(connectionProvider))
-
-      val amqpSource = AmqpSource.atMostOnceSource(
-        NamedQueueSourceSettings(connectionProvider, queueName),
-        bufferSize = 1)
-      val sourceToSink = amqpSource
-        .viaMat(KillSwitches.single)(Keep.right)
-        .map(b => WriteMessage(b.bytes).withProperties(b.properties))
-        .to(amqpSink)
-        .run()
 
-      probe.toStrict(3.second).map(_.bytes.utf8String) shouldEqual input
-      sourceToSink.shutdown()
+        probe.toStrict(3.seconds).map(_.bytes.utf8String) shouldEqual input
+        sourceToSink.shutdown()
+      }
     }
 
     "set routing key per message and consume them in the same JVM" in 
assertAllStagesStopped {
       def getRoutingKey(s: String) = s"key.$s"
 
-      val exchangeName = "amqp.topic." + System.currentTimeMillis()
-      val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
-      val exchangeDeclaration = ExchangeDeclaration(exchangeName, "topic")
-      val queueDeclaration = QueueDeclaration(queueName)
-      val bindingDeclaration = BindingDeclaration(queueName, 
exchangeName).withRoutingKey(getRoutingKey("*"))
-
-      val amqpSink = AmqpSink(
-        AmqpWriteSettings(connectionProvider)
-          .withExchange(exchangeName)
-          .withDeclarations(immutable.Seq(exchangeDeclaration, 
queueDeclaration, bindingDeclaration)))
-
-      val amqpSource = AmqpSource.atMostOnceSource(
-        NamedQueueSourceSettings(connectionProvider, queueName)
-          .withDeclarations(immutable.Seq(exchangeDeclaration, 
queueDeclaration, bindingDeclaration)),
-        bufferSize = 10)
-
-      val input = Vector("one", "two", "three", "four", "five")
-      val routingKeys = input.map(s => getRoutingKey(s))
-      Source(input)
-        .map(s => WriteMessage(ByteString(s)).withRoutingKey(getRoutingKey(s)))
-        .runWith(amqpSink)
-        .futureValue shouldEqual Done
-
-      val result = amqpSource
-        .take(input.size)
-        .runWith(Sink.seq)
-        .futureValue
-
-      result.map(_.envelope.getRoutingKey) shouldEqual routingKeys
-      result.map(_.bytes.utf8String) shouldEqual input
-    }
-
-    "declare connection that does not require server acks" in 
assertAllStagesStopped {
-      val connectionProvider =
-        AmqpDetailsConnectionProvider("localhost", 5672)
+      forAll { (reuseByteArray: Boolean) =>
+        val exchangeName = "amqp.topic." + System.currentTimeMillis()
+        val queueName = "amqp-conn-it-spec-simple-queue-" + 
System.currentTimeMillis()
+        val exchangeDeclaration = ExchangeDeclaration(exchangeName, "topic")
+        val queueDeclaration = QueueDeclaration(queueName)
+        val bindingDeclaration = BindingDeclaration(queueName, 
exchangeName).withRoutingKey(getRoutingKey("*"))
 
-      val queueName = "amqp-conn-it-spec-fire-and-forget-" + 
System.currentTimeMillis()
-      val queueDeclaration = QueueDeclaration(queueName)
+        val amqpSink = AmqpSink(
+          AmqpWriteSettings(connectionProvider)
+            .withExchange(exchangeName)
+            .withDeclarations(immutable.Seq(exchangeDeclaration, 
queueDeclaration, bindingDeclaration))
+            .withReuseByteArray(reuseByteArray))
 
-      val amqpSink = AmqpSink.simple(
-        AmqpWriteSettings(connectionProvider)
-          .withRoutingKey(queueName)
-          .withDeclaration(queueDeclaration))
-
-      val amqpSource = AmqpSource
-        .committableSource(
+        val amqpSource = AmqpSource.atMostOnceSource(
           NamedQueueSourceSettings(connectionProvider, queueName)
-            .withAckRequired(false)
-            .withDeclaration(queueDeclaration),
+            .withDeclarations(immutable.Seq(exchangeDeclaration, 
queueDeclaration, bindingDeclaration))
+            .withReuseByteArray(reuseByteArray),
           bufferSize = 10)
 
-      val input = Vector("one", "two", "three", "four", "five")
-      Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue 
shouldEqual Done
+        val input = Vector("one", "two", "three", "four", "five")
+        val routingKeys = input.map(s => getRoutingKey(s))
+        Source(input)
+          .map(s => 
WriteMessage(ByteString(s)).withRoutingKey(getRoutingKey(s)))
+          .runWith(amqpSink)
+          .futureValue shouldEqual Done
+
+        val result = amqpSource
+          .take(input.size)
+          .runWith(Sink.seq)
+          .futureValue
 
-      val result = amqpSource
-        .take(input.size)
-        .runWith(Sink.seq)
+        result.map(_.envelope.getRoutingKey) shouldEqual routingKeys
+        result.map(_.bytes.utf8String) shouldEqual input
+      }
+    }
 
-      val received = result.futureValue
-      received.map(_.message.bytes.utf8String) shouldEqual input
+    "declare connection that does not require server acks" in 
assertAllStagesStopped {
+      val connectionProvider =
+        AmqpDetailsConnectionProvider("localhost", 5672)
+
+      forAll { (reuseByteArray: Boolean) =>
+        val queueName = "amqp-conn-it-spec-fire-and-forget-" + 
System.currentTimeMillis()
+        val queueDeclaration = QueueDeclaration(queueName)
+
+        val amqpSink = AmqpSink.simple(
+          AmqpWriteSettings(connectionProvider)
+            .withRoutingKey(queueName)
+            .withDeclaration(queueDeclaration)
+            .withReuseByteArray(reuseByteArray))
+
+        val amqpSource = AmqpSource
+          .committableSource(
+            NamedQueueSourceSettings(connectionProvider, queueName)
+              .withAckRequired(false)
+              .withDeclaration(queueDeclaration)
+              .withReuseByteArray(reuseByteArray),
+            bufferSize = 10)
+
+        val input = Vector("one", "two", "three", "four", "five")
+        Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue 
shouldEqual Done
+
+        val result = amqpSource
+          .take(input.size)
+          .runWith(Sink.seq)
+
+        val received = result.futureValue
+        received.map(_.message.bytes.utf8String) shouldEqual input
+      }
     }
   }
 }
diff --git 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
index 23be429ca..b01678a1a 100644
--- 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
+++ 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
@@ -29,14 +29,14 @@ import pekko.stream.connectors.testkit.scaladsl.LogCapturing
 import pekko.stream.scaladsl.Source
 import pekko.util.ByteString
 import com.rabbitmq.client.{ AddressResolver, Connection, ConnectionFactory, 
ShutdownListener }
-import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
 
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration._
 import scala.util.control.NonFatal
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.wordspec.AnyWordSpec
 
 /**
  * see [[https://github.com/akka/alpakka/issues/883]] and
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 7045b8ddc..d87864217 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -125,7 +125,8 @@ object Dependencies {
 
   val Amqp = Seq(
     libraryDependencies ++= Seq(
-      "com.rabbitmq" % "amqp-client" % "5.21.0") ++ Mockito)
+      "com.rabbitmq" % "amqp-client" % "5.21.0",
+      "org.scalatestplus" %% scalaTestScalaCheckArtifact % 
scalaTestScalaCheckVersion % Test) ++ Mockito)
 
   val AwsSpiPekkoHttp = Seq(
     libraryDependencies ++= Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to