This is an automated email from the ASF dual-hosted git repository.
engelen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 0964a9b53 AMQP: Avoid allocating a new ByteString on each consumed
message (#592)
0964a9b53 is described below
commit 0964a9b53177b06e4bf5763994a21288392e0693
Author: Diogo Moura <[email protected]>
AuthorDate: Mon Jun 24 09:24:37 2024 +0100
AMQP: Avoid allocating a new ByteString on each consumed message (#592)
* Avoid allocating a new ByteString on each consumed message
Changes the ByteString constructor to a `fromArrayUnsafe`. It should be
safe to use this unsafe method as long as `body` is to exposed to the outside.
* add avoidArrayCopy setting
* Create avoidArrayCopy.backwards.excludes
* support avoidArrayCopy on write cases
* add some testing for avoidArrayCopy
* spelling
* Update AmqpConnectorsSpec.scala
* revert changes in one test
* update tests
* Update AmqpFlowTest.java
* reformat if stmts
* compile issue
* rename param
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.../avoidArrayCopy.backwards.excludes | 23 +
.../connectors/amqp/AmqpConnectorSettings.scala | 54 +-
.../impl/AbstractAmqpAsyncFlowStageLogic.scala | 6 +-
.../amqp/impl/AmqpReplyToSinkStage.scala | 6 +-
.../connectors/amqp/impl/AmqpRpcFlowStage.scala | 14 +-
.../connectors/amqp/impl/AmqpSimpleFlowStage.scala | 6 +-
.../connectors/amqp/impl/AmqpSourceStage.scala | 9 +-
.../connectors/amqp/javadsl/AmqpFlowTest.java | 24 +-
.../scaladsl/AmqpConnectionProvidersSpec.scala | 4 +-
.../amqp/scaladsl/AmqpConnectorsSpec.scala | 563 +++++++++++----------
...AmqpGraphStageLogicConnectionShutdownSpec.scala | 6 +-
project/Dependencies.scala | 3 +-
12 files changed, 430 insertions(+), 288 deletions(-)
diff --git
a/amqp/src/main/mima-filters/1.1.x.backward.excludes/avoidArrayCopy.backwards.excludes
b/amqp/src/main/mima-filters/1.1.x.backward.excludes/avoidArrayCopy.backwards.excludes
new file mode 100644
index 000000000..641c87ec8
--- /dev/null
+++
b/amqp/src/main/mima-filters/1.1.x.backward.excludes/avoidArrayCopy.backwards.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# added reuseByteArray setting
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.AmqpConnectorSettings.reuseByteArray")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.AmqpReplyToSinkSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.AmqpWriteSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.NamedQueueSourceSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.amqp.TemporaryQueueSourceSettings.this")
diff --git
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
index 5ffc9e74c..f9278353a 100644
---
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
+++
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectorSettings.scala
@@ -28,6 +28,7 @@ import scala.concurrent.duration._
sealed trait AmqpConnectorSettings {
def connectionProvider: AmqpConnectionProvider
def declarations: immutable.Seq[Declaration]
+ def reuseByteArray: Boolean
}
sealed trait AmqpSourceSettings extends AmqpConnectorSettings
@@ -40,7 +41,8 @@ final class NamedQueueSourceSettings private (
val exclusive: Boolean = false,
val ackRequired: Boolean = true,
val consumerTag: String = "default",
- val arguments: Map[String, AnyRef] = Map.empty) extends AmqpSourceSettings
{
+ val arguments: Map[String, AnyRef] = Map.empty,
+ val reuseByteArray: Boolean = false) extends AmqpSourceSettings {
def withDeclaration(declaration: Declaration): NamedQueueSourceSettings =
copy(declarations = immutable.Seq(declaration))
@@ -79,12 +81,16 @@ final class NamedQueueSourceSettings private (
def withArguments(arguments: java.util.Map[String, Object]):
NamedQueueSourceSettings =
copy(arguments = arguments.asScala.toMap)
+ def withReuseByteArray(reuseByteArray: Boolean): NamedQueueSourceSettings =
+ copy(reuseByteArray = reuseByteArray)
+
private def copy(declarations: immutable.Seq[Declaration] = declarations,
noLocal: Boolean = noLocal,
exclusive: Boolean = exclusive,
ackRequired: Boolean = ackRequired,
consumerTag: String = consumerTag,
- arguments: Map[String, AnyRef] = arguments) =
+ arguments: Map[String, AnyRef] = arguments,
+ reuseByteArray: Boolean = reuseByteArray) =
new NamedQueueSourceSettings(
connectionProvider,
queue,
@@ -93,7 +99,8 @@ final class NamedQueueSourceSettings private (
exclusive = exclusive,
ackRequired = ackRequired,
consumerTag = consumerTag,
- arguments = arguments)
+ arguments = arguments,
+ reuseByteArray = reuseByteArray)
override def toString: String =
"NamedQueueSourceSettings(" +
@@ -105,6 +112,7 @@ final class NamedQueueSourceSettings private (
s"ackRequired=$ackRequired, " +
s"consumerTag=$consumerTag, " +
s"arguments=$arguments" +
+ s"reuseByteArray=$reuseByteArray" +
")"
}
@@ -123,7 +131,8 @@ final class TemporaryQueueSourceSettings private (
val connectionProvider: AmqpConnectionProvider,
val exchange: String,
val declarations: immutable.Seq[Declaration] = Nil,
- val routingKey: Option[String] = None) extends AmqpSourceSettings {
+ val routingKey: Option[String] = None,
+ val reuseByteArray: Boolean = false) extends AmqpSourceSettings {
def withDeclaration(declaration: Declaration): TemporaryQueueSourceSettings =
copy(declarations = immutable.Seq(declaration))
@@ -139,8 +148,14 @@ final class TemporaryQueueSourceSettings private (
def withRoutingKey(routingKey: String): TemporaryQueueSourceSettings =
copy(routingKey = Some(routingKey))
- private def copy(declarations: immutable.Seq[Declaration] = declarations,
routingKey: Option[String] = routingKey) =
- new TemporaryQueueSourceSettings(connectionProvider, exchange,
declarations = declarations, routingKey = routingKey)
+ def withReuseByteArray(reuseByteArray: Boolean):
TemporaryQueueSourceSettings =
+ copy(reuseByteArray = reuseByteArray)
+
+ private def copy(declarations: immutable.Seq[Declaration] = declarations,
+ routingKey: Option[String] = routingKey,
+ reuseByteArray: Boolean = reuseByteArray) =
+ new TemporaryQueueSourceSettings(connectionProvider, exchange,
declarations = declarations,
+ routingKey = routingKey, reuseByteArray = reuseByteArray)
override def toString: String =
"TemporaryQueueSourceSettings(" +
@@ -148,6 +163,7 @@ final class TemporaryQueueSourceSettings private (
s"exchange=$exchange, " +
s"declarations=$declarations, " +
s"routingKey=$routingKey" +
+ s"reuseByteArray=$reuseByteArray" +
")"
}
@@ -164,19 +180,26 @@ object TemporaryQueueSourceSettings {
final class AmqpReplyToSinkSettings private (
val connectionProvider: AmqpConnectionProvider,
- val failIfReplyToMissing: Boolean = true) extends AmqpConnectorSettings {
+ val failIfReplyToMissing: Boolean = true,
+ val reuseByteArray: Boolean = false) extends AmqpConnectorSettings {
override final val declarations = Nil
def withFailIfReplyToMissing(failIfReplyToMissing: Boolean):
AmqpReplyToSinkSettings =
copy(failIfReplyToMissing = failIfReplyToMissing)
- private def copy(connectionProvider: AmqpConnectionProvider =
connectionProvider, failIfReplyToMissing: Boolean) =
- new AmqpReplyToSinkSettings(connectionProvider, failIfReplyToMissing)
+ def withReuseByteArray(reuseByteArray: Boolean): AmqpReplyToSinkSettings =
+ copy(reuseByteArray = reuseByteArray)
+
+ private def copy(connectionProvider: AmqpConnectionProvider =
connectionProvider,
+ failIfReplyToMissing: Boolean = failIfReplyToMissing,
+ reuseByteArray: Boolean = reuseByteArray) =
+ new AmqpReplyToSinkSettings(connectionProvider, failIfReplyToMissing,
reuseByteArray)
override def toString: String =
"AmqpReplyToSinkSettings(" +
s"connectionProvider=$connectionProvider, " +
s"failIfReplyToMissing=$failIfReplyToMissing" +
+ s"reuseByteArray=$reuseByteArray" +
")"
}
@@ -197,7 +220,8 @@ final class AmqpWriteSettings private (
val routingKey: Option[String] = None,
val declarations: immutable.Seq[Declaration] = Nil,
val bufferSize: Int = 10,
- val confirmationTimeout: FiniteDuration = 100.millis) extends
AmqpConnectorSettings {
+ val confirmationTimeout: FiniteDuration = 100.millis,
+ val reuseByteArray: Boolean = false) extends AmqpConnectorSettings {
def withExchange(exchange: String): AmqpWriteSettings =
copy(exchange = Some(exchange))
@@ -211,6 +235,9 @@ final class AmqpWriteSettings private (
def withDeclarations(declarations: immutable.Seq[Declaration]):
AmqpWriteSettings =
copy(declarations = declarations)
+ def withReuseByteArray(reuseByteArray: Boolean): AmqpWriteSettings =
+ copy(reuseByteArray = reuseByteArray)
+
/**
* Java API
*/
@@ -234,8 +261,10 @@ final class AmqpWriteSettings private (
routingKey: Option[String] = routingKey,
declarations: immutable.Seq[Declaration] = declarations,
bufferSize: Int = bufferSize,
- confirmationTimeout: FiniteDuration = confirmationTimeout) =
- new AmqpWriteSettings(connectionProvider, exchange, routingKey,
declarations, bufferSize, confirmationTimeout)
+ confirmationTimeout: FiniteDuration = confirmationTimeout,
+ reuseByteArray: Boolean = reuseByteArray) =
+ new AmqpWriteSettings(connectionProvider, exchange, routingKey,
declarations, bufferSize,
+ confirmationTimeout, reuseByteArray)
override def toString: String =
"AmqpSinkSettings(" +
@@ -245,6 +274,7 @@ final class AmqpWriteSettings private (
s"declarations=$declarations, " +
s"bufferSize=$bufferSize, " +
s"confirmationTimeout=$confirmationTimeout" +
+ s"reuseByteArray=$reuseByteArray" +
")"
}
diff --git
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
index c8d5ae721..e126289f5 100644
---
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
+++
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala
@@ -164,13 +164,17 @@ import scala.concurrent.Promise
log.debug("Publishing message {} with deliveryTag {}.", message, tag)
+ val bytes = if (settings.reuseByteArray)
+ message.bytes.toArrayUnsafe()
+ else
+ message.bytes.toArray
channel.basicPublish(
exchange,
message.routingKey.getOrElse(routingKey),
message.mandatory,
message.immediate,
message.properties.orNull,
- message.bytes.toArray)
+ bytes)
tag
}
diff --git
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
index e012e5e1d..43f2db389 100644
---
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
+++
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala
@@ -75,13 +75,17 @@ private[amqp] final class
AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
val replyTo = elem.properties.flatMap(properties =>
Option(properties.getReplyTo))
if (replyTo.isDefined) {
+ val bytes = if (settings.reuseByteArray)
+ elem.bytes.toArrayUnsafe()
+ else
+ elem.bytes.toArray
channel.basicPublish(
elem.routingKey.getOrElse(""),
replyTo.get,
elem.mandatory,
elem.immediate,
elem.properties.orNull,
- elem.bytes.toArray)
+ bytes)
} else if (settings.failIfReplyToMissing) {
onFailure(new RuntimeException("Reply-to header was not set"))
}
diff --git
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
index 289ff27eb..4c5b453eb 100644
---
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
+++
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala
@@ -105,7 +105,13 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings:
AmqpWriteSettings, buf
body: Array[Byte]): Unit =
consumerCallback.invoke(
new CommittableReadResult {
- override val message = ReadResult(ByteString(body),
envelope, properties)
+ override val message = {
+ val byteString = if (settings.reuseByteArray)
+ ByteString.fromArrayUnsafe(body)
+ else
+ ByteString(body)
+ ReadResult(byteString, envelope, properties)
+ }
override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
@@ -196,6 +202,10 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings:
AmqpWriteSettings, buf
override def onPush(): Unit = {
val elem = grab(in)
+ val bytes = if (settings.reuseByteArray)
+ elem.bytes.toArrayUnsafe()
+ else
+ elem.bytes.toArray
val props = elem.properties.getOrElse(new
BasicProperties()).builder.replyTo(queueName).build()
channel.basicPublish(
exchange,
@@ -203,7 +213,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings:
AmqpWriteSettings, buf
elem.mandatory,
elem.immediate,
props,
- elem.bytes.toArray)
+ bytes)
val expectedResponses: Int = {
val headers = props.getHeaders
diff --git
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
index 5d24b4447..01333c133 100644
---
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
+++
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSimpleFlowStage.scala
@@ -49,13 +49,17 @@ import scala.concurrent.{ Future, Promise }
override def publish(message: WriteMessage, passThrough: T): Unit = {
log.debug("Publishing message {}.", message)
+ val bytes = if (settings.reuseByteArray)
+ message.bytes.toArrayUnsafe()
+ else
+ message.bytes.toArray
channel.basicPublish(
settings.exchange.getOrElse(""),
message.routingKey.orElse(settings.routingKey).getOrElse(""),
message.mandatory,
message.immediate,
message.properties.orNull,
- message.bytes.toArray)
+ bytes)
push(out, (WriteResult.confirmed, passThrough))
}
}, streamCompletion.future)
diff --git
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
index 2e950ef95..63ddfb79d 100644
---
a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
+++
b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala
@@ -93,10 +93,15 @@ private[amqp] final class AmqpSourceStage(settings:
AmqpSourceSettings, bufferSi
envelope: Envelope,
properties: BasicProperties,
body: Array[Byte]): Unit = {
+ val byteString = if (settings.reuseByteArray)
+ ByteString.fromArrayUnsafe(body)
+ else
+ ByteString(body)
+
val message = if (ackRequired) {
new CommittableReadResult {
- override val message = ReadResult(ByteString(body), envelope,
properties)
+ override val message = ReadResult(byteString, envelope,
properties)
override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
@@ -110,7 +115,7 @@ private[amqp] final class AmqpSourceStage(settings:
AmqpSourceSettings, bufferSi
promise.future
}
}
- } else new AutoAckedReadResult(ReadResult(ByteString(body),
envelope, properties))
+ } else new AutoAckedReadResult(ReadResult(byteString, envelope,
properties))
consumerCallback.invoke(message)
}
diff --git
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
index f7a673823..06f5e435f 100644
---
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
+++
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
@@ -19,10 +19,15 @@ import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
-import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
+import scala.collection.JavaConverters;
+
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorSystem;
@@ -32,6 +37,7 @@ import
org.apache.pekko.stream.connectors.amqp.AmqpWriteSettings;
import org.apache.pekko.stream.connectors.amqp.QueueDeclaration;
import org.apache.pekko.stream.connectors.amqp.WriteMessage;
import org.apache.pekko.stream.connectors.amqp.WriteResult;
+import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.FlowWithContext;
import org.apache.pekko.stream.javadsl.Keep;
@@ -39,11 +45,22 @@ import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.testkit.TestSubscriber;
import org.apache.pekko.stream.testkit.javadsl.TestSink;
import org.apache.pekko.util.ByteString;
-import scala.collection.JavaConverters;
/** Needs a local running AMQP server on the default port with no password. */
+@RunWith(Parameterized.class)
public class AmqpFlowTest {
+ @Parameters
+ public static Iterable<? extends Object> data() {
+ return Arrays.asList(false, true);
+ }
+
+ /**
+ * This value is initialized with values from data() array
+ */
+ @Parameter
+ public boolean reuseByteArray;
+
@Rule public final LogCapturingJunit4 logCapturing = new
LogCapturingJunit4();
private static ActorSystem system;
@@ -53,13 +70,14 @@ public class AmqpFlowTest {
system = ActorSystem.create();
}
- private static AmqpWriteSettings settings() {
+ private AmqpWriteSettings settings() {
final String queueName = "amqp-flow-spec" + System.currentTimeMillis();
final QueueDeclaration queueDeclaration =
QueueDeclaration.create(queueName);
return AmqpWriteSettings.create(AmqpLocalConnectionProvider.getInstance())
.withRoutingKey(queueName)
.withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray)
.withBufferSize(10)
.withConfirmationTimeout(Duration.ofMillis(200));
}
diff --git
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
index 0f6b9ef44..1770cf9de 100644
---
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
+++
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectionProvidersSpec.scala
@@ -200,9 +200,9 @@ class AmqpConnectionProvidersSpec extends AmqpSpec {
val connectionProvider = AmqpDetailsConnectionProvider("localhost", 5673)
val reusableConnectionProvider =
AmqpCachedConnectionProvider(connectionProvider).withAutomaticRelease(false)
try reusableConnectionProvider.get
- catch { case e: Throwable => e shouldBe an[ConnectException] }
+ catch { case e: Throwable => e shouldBe a[ConnectException] }
try reusableConnectionProvider.get
- catch { case e: Throwable => e shouldBe an[ConnectException] }
+ catch { case e: Throwable => e shouldBe a[ConnectException] }
}
}
}
diff --git
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
index a3a73c52e..dad8f3bb7 100644
---
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
+++
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
@@ -26,6 +26,7 @@ import pekko.stream.testkit.{ TestPublisher, TestSubscriber }
import pekko.util.ByteString
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.AuthenticationFailureException
+import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -34,7 +35,7 @@ import scala.collection.immutable
/**
* Needs a local running AMQP server on the default port with no password.
*/
-class AmqpConnectorsSpec extends AmqpSpec {
+class AmqpConnectorsSpec extends AmqpSpec with ScalaCheckDrivenPropertyChecks {
override implicit val patienceConfig: PatienceConfig =
PatienceConfig(10.seconds)
@@ -43,78 +44,89 @@ class AmqpConnectorsSpec extends AmqpSpec {
val connectionProvider = AmqpLocalConnectionProvider
"connection should fail to wrong broker" in assertAllStagesStopped {
- val connectionProvider = AmqpDetailsConnectionProvider("localhost", 5673)
+ forAll { (reuseByteArray: Boolean) =>
+ val connectionProvider = AmqpDetailsConnectionProvider("localhost",
5673)
- val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
+ val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
- val amqpSink = AmqpSink.simple(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration))
+ val amqpSink = AmqpSink.simple(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray))
- val input = Vector("one", "two", "three", "four", "five")
- val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
- result.failed.futureValue shouldBe an[ConnectException]
+ val input = Vector("one", "two", "three", "four", "five")
+ val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
+ result.failed.futureValue shouldBe a[ConnectException]
+ }
}
"connection should fail with wrong credentials" in assertAllStagesStopped {
- val connectionProvider =
- AmqpDetailsConnectionProvider("invalid", 5673)
- .withHostsAndPorts(immutable.Seq("localhost" -> 5672))
- .withCredentials(AmqpCredentials("guest", "guest1"))
-
- val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
-
- val amqpSink = AmqpSink.simple(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration))
-
- val input = Vector("one", "two", "three", "four", "five")
- val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
- result.failed.futureValue shouldBe an[AuthenticationFailureException]
+ forAll { (reuseByteArray: Boolean) =>
+ val connectionProvider =
+ AmqpDetailsConnectionProvider("invalid", 5673)
+ .withHostsAndPorts(immutable.Seq("localhost" -> 5672))
+ .withCredentials(AmqpCredentials("guest", "guest1"))
+
+ val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
+
+ val amqpSink = AmqpSink.simple(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray))
+
+ val input = Vector("one", "two", "three", "four", "five")
+ val result = Source(input).map(s => ByteString(s)).runWith(amqpSink)
+ result.failed.futureValue shouldBe an[AuthenticationFailureException]
+ }
}
"publish via RPC which expects 2 responses per message and then consume
through a simple queue again in the same JVM" in assertAllStagesStopped {
- val queueName = "amqp-conn-it-spec-rpc-queue-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
-
- val amqpRpcFlow = AmqpRpcFlow.simple(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration),
- 2)
-
- val amqpSource = AmqpSource.atMostOnceSource(
- NamedQueueSourceSettings(connectionProvider, queueName),
- bufferSize = 1)
-
- val input = Vector("one", "two", "three", "four", "five")
- val (rpcQueueF, probe) =
- Source(input).map(s =>
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
- rpcQueueF.futureValue
-
- val amqpSink = AmqpSink.replyTo(
- AmqpReplyToSinkSettings(connectionProvider))
-
- val sourceToSink = amqpSource
- .viaMat(KillSwitches.single)(Keep.right)
- .mapConcat { b =>
- List(
-
WriteMessage(b.bytes.concat(ByteString("a"))).withProperties(b.properties),
-
WriteMessage(b.bytes.concat(ByteString("aa"))).withProperties(b.properties))
- }
- .to(amqpSink)
- .run()
+ forAll { (reuseByteArray: Boolean) =>
+ val queueName = "amqp-conn-it-spec-rpc-queue-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
+
+ val amqpRpcFlow = AmqpRpcFlow.simple(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray),
+ 2)
+
+ val amqpSource = AmqpSource.atMostOnceSource(
+ NamedQueueSourceSettings(connectionProvider, queueName)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 1)
+
+ val input = Vector("one", "two", "three", "four", "five")
+ val (rpcQueueF, probe) =
+ Source(input).map(s =>
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
+ rpcQueueF.futureValue
+
+ val amqpSink = AmqpSink.replyTo(
+ AmqpReplyToSinkSettings(connectionProvider)
+ .withReuseByteArray(reuseByteArray))
+
+ val sourceToSink = amqpSource
+ .viaMat(KillSwitches.single)(Keep.right)
+ .mapConcat { b =>
+ List(
+
WriteMessage(b.bytes.concat(ByteString("a"))).withProperties(b.properties),
+
WriteMessage(b.bytes.concat(ByteString("aa"))).withProperties(b.properties))
+ }
+ .to(amqpSink)
+ .run()
- probe
- .request(10)
- .expectNextUnorderedN(input.flatMap(s =>
List(ByteString(s.concat("a")), ByteString(s.concat("aa")))))
- .expectComplete()
+ probe
+ .request(10)
+ .expectNextUnorderedN(input.flatMap(s =>
List(ByteString(s.concat("a")), ByteString(s.concat("aa")))))
+ .expectComplete()
- sourceToSink.shutdown()
+ sourceToSink.shutdown()
+ }
}
"correctly close a AmqpRpcFlow when stream is closed without passing any
elements" in assertAllStagesStopped {
@@ -163,252 +175,283 @@ class AmqpConnectorsSpec extends AmqpSpec {
Keep.right)
.run()
.futureValue shouldBe Done
-
}
"publish from one source and consume elements with multiple sinks" in
assertAllStagesStopped {
- val queueName = "amqp-conn-it-spec-work-queues-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
- val amqpSink = AmqpSink.simple(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration))
-
- val input = Vector("one", "two", "three", "four", "five")
- Source(input).map(s => ByteString(s)).runWith(amqpSink)
-
- val mergedSources = Source.fromGraph(GraphDSL.create() { implicit b =>
- import GraphDSL.Implicits._
- val count = 3
- val merge = b.add(Merge[ReadResult](count))
- for (n <- 0 until count) {
- val source = b.add(
- AmqpSource.atMostOnceSource(
- NamedQueueSourceSettings(connectionProvider, queueName)
- .withDeclaration(queueDeclaration),
- bufferSize = 1))
- source.out ~> merge.in(n)
- }
-
- SourceShape(merge.out)
- })
-
- val result =
mergedSources.map(_.bytes.utf8String).take(input.size).runWith(Sink.seq)
-
- result.futureValue.sorted shouldEqual input.sorted
+ forAll { (reuseByteArray: Boolean) =>
+ val queueName = "amqp-conn-it-spec-work-queues-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
+ val amqpSink = AmqpSink.simple(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray))
+
+ val input = Vector("one", "two", "three", "four", "five")
+ Source(input).map(s => ByteString(s)).runWith(amqpSink)
+
+ val mergedSources = Source.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val count = 3
+ val merge = b.add(Merge[ReadResult](count))
+ for (n <- 0 until count) {
+ val source = b.add(
+ AmqpSource.atMostOnceSource(
+ NamedQueueSourceSettings(connectionProvider, queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 1))
+ source.out ~> merge.in(n)
+ }
+
+ SourceShape(merge.out)
+ })
+
+ val result =
mergedSources.map(_.bytes.utf8String).take(input.size).runWith(Sink.seq)
+
+ result.futureValue.sorted shouldEqual input.sorted
+ }
}
"not fail on a fast producer and a slow consumer" in
assertAllStagesStopped {
- val queueName = "amqp-conn-it-spec-simple-queue-2-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
- val amqpSource = AmqpSource.atMostOnceSource(
- NamedQueueSourceSettings(connectionProvider,
queueName).withDeclaration(queueDeclaration),
- bufferSize = 2)
+ forAll { (reuseByteArray: Boolean) =>
+ val queueName = "amqp-conn-it-spec-simple-queue-2-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
+ val amqpSource = AmqpSource.atMostOnceSource(
+ NamedQueueSourceSettings(connectionProvider, queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 2)
- val amqpSink = AmqpSink.simple(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration))
+ val amqpSink = AmqpSink.simple(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray))
- val publisher = TestPublisher.probe[ByteString]()
- val subscriber = TestSubscriber.probe[ReadResult]()
-
Source.fromPublisher(publisher).to(amqpSink).addAttributes(Attributes.inputBuffer(1,
1)).run()
-
amqpSource.to(Sink.fromSubscriber(subscriber)).addAttributes(Attributes.inputBuffer(1,
1)).run()
+ val publisher = TestPublisher.probe[ByteString]()
+ val subscriber = TestSubscriber.probe[ReadResult]()
+
Source.fromPublisher(publisher).to(amqpSink).addAttributes(Attributes.inputBuffer(1,
1)).run()
+
amqpSource.to(Sink.fromSubscriber(subscriber)).addAttributes(Attributes.inputBuffer(1,
1)).run()
- // note that this essentially is testing rabbitmq just as much as it
tests our sink and source
- publisher.ensureSubscription()
- subscriber.ensureSubscription()
+ // note that this essentially is testing rabbitmq just as much as it
tests our sink and source
+ publisher.ensureSubscription()
+ subscriber.ensureSubscription()
- publisher.expectRequest() shouldEqual 1
- publisher.sendNext(ByteString("one"))
+ publisher.expectRequest() shouldEqual 1
+ publisher.sendNext(ByteString("one"))
- publisher.expectRequest()
- publisher.sendNext(ByteString("two"))
+ publisher.expectRequest()
+ publisher.sendNext(ByteString("two"))
- publisher.expectRequest()
- publisher.sendNext(ByteString("three"))
+ publisher.expectRequest()
+ publisher.sendNext(ByteString("three"))
- publisher.expectRequest()
- publisher.sendNext(ByteString("four"))
+ publisher.expectRequest()
+ publisher.sendNext(ByteString("four"))
- publisher.expectRequest()
- publisher.sendNext(ByteString("five"))
+ publisher.expectRequest()
+ publisher.sendNext(ByteString("five"))
- subscriber.request(4)
- subscriber.expectNext().bytes.utf8String shouldEqual "one"
- subscriber.expectNext().bytes.utf8String shouldEqual "two"
- subscriber.expectNext().bytes.utf8String shouldEqual "three"
- subscriber.expectNext().bytes.utf8String shouldEqual "four"
+ subscriber.request(4)
+ subscriber.expectNext().bytes.utf8String shouldEqual "one"
+ subscriber.expectNext().bytes.utf8String shouldEqual "two"
+ subscriber.expectNext().bytes.utf8String shouldEqual "three"
+ subscriber.expectNext().bytes.utf8String shouldEqual "four"
- subscriber.request(1)
- subscriber.expectNext().bytes.utf8String shouldEqual "five"
+ subscriber.request(1)
+ subscriber.expectNext().bytes.utf8String shouldEqual "five"
- subscriber.cancel()
- publisher.sendComplete()
- succeed
+ subscriber.cancel()
+ publisher.sendComplete()
+ succeed
+ }
}
"keep connection open if downstream closes and there are pending acks" in
assertAllStagesStopped {
- val connectionSettings = AmqpDetailsConnectionProvider("localhost", 5672)
+ forAll { (reuseByteArray: Boolean) =>
+ val connectionSettings = AmqpDetailsConnectionProvider("localhost",
5672)
+
+ val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
+
+ val amqpSink = AmqpSink.simple(
+ AmqpWriteSettings(connectionSettings)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray))
+
+ val amqpSource = AmqpSource.committableSource(
+ NamedQueueSourceSettings(connectionSettings, queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 10)
- val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
+ val input = Vector("one", "two", "three", "four", "five")
+ Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue
shouldEqual Done
- val amqpSink = AmqpSink.simple(
- AmqpWriteSettings(connectionSettings)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration))
+ val result = amqpSource
+ .take(input.size)
+ .runWith(Sink.seq)
- val amqpSource = AmqpSource.committableSource(
- NamedQueueSourceSettings(connectionSettings,
queueName).withDeclaration(queueDeclaration),
- bufferSize = 10)
+ result.futureValue.map(cm => {
+ noException should be thrownBy cm.ack().futureValue
+ })
+ }
+ }
- val input = Vector("one", "two", "three", "four", "five")
- Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue
shouldEqual Done
+ "not republish message without autoAck(false) if nack is sent" in
assertAllStagesStopped {
+ forAll { (reuseByteArray: Boolean) =>
+ val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
+
+ val amqpSink = AmqpSink.simple(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray))
+ val input = Vector("one", "two", "three", "four", "five")
+ Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue
shouldEqual Done
+
+ val amqpSource = AmqpSource.committableSource(
+ NamedQueueSourceSettings(connectionProvider, queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 10)
- val result = amqpSource
- .take(input.size)
- .runWith(Sink.seq)
+ val result1 = amqpSource
+ .mapAsync(1)(cm => cm.nack(requeue = false).map(_ => cm))
+ .take(input.size)
+ .runWith(Sink.seq)
- result.futureValue.map(cm => {
- noException should be thrownBy cm.ack().futureValue
- })
- }
+ Await.ready(result1, 3.seconds)
- "not republish message without autoAck(false) if nack is sent" in
assertAllStagesStopped {
- val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
-
- val amqpSink = AmqpSink.simple(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration))
- val input = Vector("one", "two", "three", "four", "five")
- Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue
shouldEqual Done
-
- val amqpSource = AmqpSource.committableSource(
- NamedQueueSourceSettings(connectionProvider,
queueName).withDeclaration(queueDeclaration),
- bufferSize = 10)
-
- val result1 = amqpSource
- .mapAsync(1)(cm => cm.nack(requeue = false).map(_ => cm))
- .take(input.size)
- .runWith(Sink.seq)
-
- Await.ready(result1, 3.seconds)
-
- val (sourceToSeq, result2) = amqpSource
- .viaMat(KillSwitches.single)(Keep.right)
- .mapAsync(1)(cm => cm.ack().map(_ => cm))
- .take(input.size)
- .toMat(Sink.seq)(Keep.both)
- .run()
+ val (sourceToSeq, result2) = amqpSource
+ .viaMat(KillSwitches.single)(Keep.right)
+ .mapAsync(1)(cm => cm.ack().map(_ => cm))
+ .take(input.size)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
- result2.isReadyWithin(1.second) shouldEqual false
- sourceToSeq.shutdown()
+ result2.isReadyWithin(1.second) shouldEqual false
+ sourceToSeq.shutdown()
+ }
}
"publish via RPC and then consume through a simple queue again in the same
JVM without autoAck" in assertAllStagesStopped {
- val queueName = "amqp-conn-it-spec-rpc-queue-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
+ forAll { (reuseByteArray: Boolean) =>
+ val queueName = "amqp-conn-it-spec-rpc-queue-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
- val input = Vector("one", "two", "three", "four", "five")
+ val input = Vector("one", "two", "three", "four", "five")
- val amqpRpcFlow = AmqpRpcFlow.committableFlow(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration),
- bufferSize = 10)
- val (rpcQueueF, probe) =
- Source(input)
- .map(s => ByteString(s))
- .map(bytes => WriteMessage(bytes))
- .viaMat(amqpRpcFlow)(Keep.right)
- .mapAsync(1)(cm => cm.ack().map(_ => cm.message))
- .toMat(TestSink.probe)(Keep.both)
+ val amqpRpcFlow = AmqpRpcFlow.committableFlow(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 10)
+ val (rpcQueueF, probe) =
+ Source(input)
+ .map(s => ByteString(s))
+ .map(bytes => WriteMessage(bytes))
+ .viaMat(amqpRpcFlow)(Keep.right)
+ .mapAsync(1)(cm => cm.ack().map(_ => cm.message))
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ rpcQueueF.futureValue
+
+ val amqpSink = AmqpSink.replyTo(
+ AmqpReplyToSinkSettings(connectionProvider)
+ .withReuseByteArray(reuseByteArray))
+
+ val amqpSource = AmqpSource.atMostOnceSource(
+ NamedQueueSourceSettings(connectionProvider, queueName)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 1)
+ val sourceToSink = amqpSource
+ .viaMat(KillSwitches.single)(Keep.right)
+ .map(b => WriteMessage(b.bytes).withProperties(b.properties))
+ .to(amqpSink)
.run()
- rpcQueueF.futureValue
-
- val amqpSink = AmqpSink.replyTo(
- AmqpReplyToSinkSettings(connectionProvider))
-
- val amqpSource = AmqpSource.atMostOnceSource(
- NamedQueueSourceSettings(connectionProvider, queueName),
- bufferSize = 1)
- val sourceToSink = amqpSource
- .viaMat(KillSwitches.single)(Keep.right)
- .map(b => WriteMessage(b.bytes).withProperties(b.properties))
- .to(amqpSink)
- .run()
- probe.toStrict(3.second).map(_.bytes.utf8String) shouldEqual input
- sourceToSink.shutdown()
+ probe.toStrict(3.seconds).map(_.bytes.utf8String) shouldEqual input
+ sourceToSink.shutdown()
+ }
}
"set routing key per message and consume them in the same JVM" in
assertAllStagesStopped {
def getRoutingKey(s: String) = s"key.$s"
- val exchangeName = "amqp.topic." + System.currentTimeMillis()
- val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
- val exchangeDeclaration = ExchangeDeclaration(exchangeName, "topic")
- val queueDeclaration = QueueDeclaration(queueName)
- val bindingDeclaration = BindingDeclaration(queueName,
exchangeName).withRoutingKey(getRoutingKey("*"))
-
- val amqpSink = AmqpSink(
- AmqpWriteSettings(connectionProvider)
- .withExchange(exchangeName)
- .withDeclarations(immutable.Seq(exchangeDeclaration,
queueDeclaration, bindingDeclaration)))
-
- val amqpSource = AmqpSource.atMostOnceSource(
- NamedQueueSourceSettings(connectionProvider, queueName)
- .withDeclarations(immutable.Seq(exchangeDeclaration,
queueDeclaration, bindingDeclaration)),
- bufferSize = 10)
-
- val input = Vector("one", "two", "three", "four", "five")
- val routingKeys = input.map(s => getRoutingKey(s))
- Source(input)
- .map(s => WriteMessage(ByteString(s)).withRoutingKey(getRoutingKey(s)))
- .runWith(amqpSink)
- .futureValue shouldEqual Done
-
- val result = amqpSource
- .take(input.size)
- .runWith(Sink.seq)
- .futureValue
-
- result.map(_.envelope.getRoutingKey) shouldEqual routingKeys
- result.map(_.bytes.utf8String) shouldEqual input
- }
-
- "declare connection that does not require server acks" in
assertAllStagesStopped {
- val connectionProvider =
- AmqpDetailsConnectionProvider("localhost", 5672)
+ forAll { (reuseByteArray: Boolean) =>
+ val exchangeName = "amqp.topic." + System.currentTimeMillis()
+ val queueName = "amqp-conn-it-spec-simple-queue-" +
System.currentTimeMillis()
+ val exchangeDeclaration = ExchangeDeclaration(exchangeName, "topic")
+ val queueDeclaration = QueueDeclaration(queueName)
+ val bindingDeclaration = BindingDeclaration(queueName,
exchangeName).withRoutingKey(getRoutingKey("*"))
- val queueName = "amqp-conn-it-spec-fire-and-forget-" +
System.currentTimeMillis()
- val queueDeclaration = QueueDeclaration(queueName)
+ val amqpSink = AmqpSink(
+ AmqpWriteSettings(connectionProvider)
+ .withExchange(exchangeName)
+ .withDeclarations(immutable.Seq(exchangeDeclaration,
queueDeclaration, bindingDeclaration))
+ .withReuseByteArray(reuseByteArray))
- val amqpSink = AmqpSink.simple(
- AmqpWriteSettings(connectionProvider)
- .withRoutingKey(queueName)
- .withDeclaration(queueDeclaration))
-
- val amqpSource = AmqpSource
- .committableSource(
+ val amqpSource = AmqpSource.atMostOnceSource(
NamedQueueSourceSettings(connectionProvider, queueName)
- .withAckRequired(false)
- .withDeclaration(queueDeclaration),
+ .withDeclarations(immutable.Seq(exchangeDeclaration,
queueDeclaration, bindingDeclaration))
+ .withReuseByteArray(reuseByteArray),
bufferSize = 10)
- val input = Vector("one", "two", "three", "four", "five")
- Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue
shouldEqual Done
+ val input = Vector("one", "two", "three", "four", "five")
+ val routingKeys = input.map(s => getRoutingKey(s))
+ Source(input)
+ .map(s =>
WriteMessage(ByteString(s)).withRoutingKey(getRoutingKey(s)))
+ .runWith(amqpSink)
+ .futureValue shouldEqual Done
+
+ val result = amqpSource
+ .take(input.size)
+ .runWith(Sink.seq)
+ .futureValue
- val result = amqpSource
- .take(input.size)
- .runWith(Sink.seq)
+ result.map(_.envelope.getRoutingKey) shouldEqual routingKeys
+ result.map(_.bytes.utf8String) shouldEqual input
+ }
+ }
- val received = result.futureValue
- received.map(_.message.bytes.utf8String) shouldEqual input
+ "declare connection that does not require server acks" in
assertAllStagesStopped {
+ val connectionProvider =
+ AmqpDetailsConnectionProvider("localhost", 5672)
+
+ forAll { (reuseByteArray: Boolean) =>
+ val queueName = "amqp-conn-it-spec-fire-and-forget-" +
System.currentTimeMillis()
+ val queueDeclaration = QueueDeclaration(queueName)
+
+ val amqpSink = AmqpSink.simple(
+ AmqpWriteSettings(connectionProvider)
+ .withRoutingKey(queueName)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray))
+
+ val amqpSource = AmqpSource
+ .committableSource(
+ NamedQueueSourceSettings(connectionProvider, queueName)
+ .withAckRequired(false)
+ .withDeclaration(queueDeclaration)
+ .withReuseByteArray(reuseByteArray),
+ bufferSize = 10)
+
+ val input = Vector("one", "two", "three", "four", "five")
+ Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue
shouldEqual Done
+
+ val result = amqpSource
+ .take(input.size)
+ .runWith(Sink.seq)
+
+ val received = result.futureValue
+ received.map(_.message.bytes.utf8String) shouldEqual input
+ }
}
}
}
diff --git
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
index 23be429ca..b01678a1a 100644
---
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
+++
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala
@@ -29,14 +29,14 @@ import pekko.stream.connectors.testkit.scaladsl.LogCapturing
import pekko.stream.scaladsl.Source
import pekko.util.ByteString
import com.rabbitmq.client.{ AddressResolver, Connection, ConnectionFactory,
ShutdownListener }
-import org.scalatest.concurrent.ScalaFutures
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.util.control.NonFatal
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.wordspec.AnyWordSpec
/**
* see [[https://github.com/akka/alpakka/issues/883]] and
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 7045b8ddc..d87864217 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -125,7 +125,8 @@ object Dependencies {
val Amqp = Seq(
libraryDependencies ++= Seq(
- "com.rabbitmq" % "amqp-client" % "5.21.0") ++ Mockito)
+ "com.rabbitmq" % "amqp-client" % "5.21.0",
+ "org.scalatestplus" %% scalaTestScalaCheckArtifact %
scalaTestScalaCheckVersion % Test) ++ Mockito)
val AwsSpiPekkoHttp = Seq(
libraryDependencies ++= Seq(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]