This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new a4ecf0ab4 retry mqtt stream test that fails a lot (#196)
a4ecf0ab4 is described below
commit a4ecf0ab431c048cea7053e88688c438db5fdfc4
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Aug 21 11:58:03 2023 +0100
retry mqtt stream test that fails a lot (#196)
* increase mqtt test timeout
* Update MqttSessionSpec.scala
---
.../test/scala/docs/scaladsl/MqttSessionSpec.scala | 252 +++++++++++----------
1 file changed, 129 insertions(+), 123 deletions(-)
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
index 854768115..237f301fa 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
@@ -31,8 +31,8 @@ import pekko.stream.OverflowStrategy
import pekko.testkit._
import pekko.util.{ ByteString, Timeout }
import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.time.{ Millis, Span }
+import org.scalatest.concurrent.{ Eventually, ScalaFutures }
+import org.scalatest.time.{ Millis, Minutes, Span }
import scala.concurrent.{ ExecutionContext, Promise }
import scala.concurrent.duration._
@@ -40,12 +40,15 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
+import scala.util.Right
+
class MqttSessionSpec
extends TestKit(ActorSystem("mqtt-spec"))
with AnyWordSpecLike
with BeforeAndAfterAll
with ScalaFutures
with Matchers
+ with Eventually
with LogCapturing {
val log = LoggerFactory.getLogger(classOf[MqttSessionSpec])
@@ -1919,156 +1922,159 @@ class MqttSessionSpec
// longer patience needed since Akka 2.6
implicit val patienceConfig: PatienceConfig =
PatienceConfig(scaled(1.second), scaled(50.millis))
- val serverSession =
ActorMqttServerSession(settings.withProducerPubAckRecTimeout(10.millis))
-
- val client1 = TestProbe()
- val toClient1 = Sink.foreach[ByteString](bytes => client1.ref ! bytes)
- val (client1Connection, fromClient1) = Source
- .queue[ByteString](1, OverflowStrategy.dropHead)
- .toMat(BroadcastHub.sink)(Keep.both)
- .run()
-
- val pipeToClient1 = Flow.fromSinkAndSource(toClient1, fromClient1)
-
- val client2 = TestProbe()
- val toClient2 = Sink.foreach[ByteString](bytes => client2.ref ! bytes)
- val (client2Connection, fromClient2) = Source
- .queue[ByteString](0, OverflowStrategy.dropHead)
- .toMat(BroadcastHub.sink)(Keep.both)
- .run()
-
- val pipeToClient2 = Flow.fromSinkAndSource(toClient2, fromClient2)
+ // https://github.com/apache/incubator-pekko-connectors/issues/148
+ eventually(timeout(Span(1, Minutes))) {
+ val serverSession =
ActorMqttServerSession(settings.withProducerPubAckRecTimeout(10.millis))
- val clientId = "some-client-id"
-
- val connect = Connect(clientId, ConnectFlags.None)
- val connect1Received = Promise[Done]()
- val connect2Received = Promise[Done]()
-
- val subscribe = Subscribe("some-topic")
- val subscribe1Received = Promise[Done]()
- val subscribe2Received = Promise[Done]()
-
- val pubAckReceived = Promise[Done]()
+ val client1 = TestProbe()
+ val toClient1 = Sink.foreach[ByteString](bytes => client1.ref ! bytes)
+ val (client1Connection, fromClient1) = Source
+ .queue[ByteString](1, OverflowStrategy.dropHead)
+ .toMat(BroadcastHub.sink)(Keep.both)
+ .run()
- val disconnect = Disconnect
- val disconnectReceived = Promise[Done]()
+ val pipeToClient1 = Flow.fromSinkAndSource(toClient1, fromClient1)
- val serverConnection1 =
- Source
- .queue[Command[Nothing]](1, OverflowStrategy.fail)
- .via(
- Mqtt
- .serverSessionFlow(serverSession, ByteString.empty)
- .join(pipeToClient1))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
- case Right(Event(`connect`, _)) =>
- connect1Received.success(Done)
- case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
- subscribe1Received.success(Done)
- case Right(Event(`disconnect`, _)) =>
- disconnectReceived.success(Done)
- case other => fail(s"didn't match `$other`")
- })
- .toMat(Sink.seq)(Keep.left)
+ val client2 = TestProbe()
+ val toClient2 = Sink.foreach[ByteString](bytes => client2.ref ! bytes)
+ val (client2Connection, fromClient2) = Source
+ .queue[ByteString](0, OverflowStrategy.dropHead)
+ .toMat(BroadcastHub.sink)(Keep.both)
.run()
- val connectBytes = connect.encode(ByteString.newBuilder).result()
- val connAck = ConnAck(ConnAckFlags.None,
ConnAckReturnCode.ConnectionAccepted)
- val connAckBytes = connAck.encode(ByteString.newBuilder).result()
+ val pipeToClient2 = Flow.fromSinkAndSource(toClient2, fromClient2)
- val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
- val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
- val subAckBytes = subAck.encode(ByteString.newBuilder).result()
+ val clientId = "some-client-id"
- val publish = Publish("some-topic", ByteString("some-payload"))
- val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
- val dupPublishBytes = publish
- .copy(flags = publish.flags | ControlPacketFlags.DUP)
- .encode(ByteString.newBuilder, Some(PacketId(1)))
- .result()
- val pubAck = PubAck(PacketId(1))
- val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
+ val connect = Connect(clientId, ConnectFlags.None)
+ val connect1Received = Promise[Done]()
+ val connect2Received = Promise[Done]()
- val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()
+ val subscribe = Subscribe("some-topic")
+ val subscribe1Received = Promise[Done]()
+ val subscribe2Received = Promise[Done]()
- client1Connection.offer(connectBytes)
+ val pubAckReceived = Promise[Done]()
- connect1Received.future.futureValue shouldBe Done
+ val disconnect = Disconnect
+ val disconnectReceived = Promise[Done]()
- serverConnection1.offer(Command(connAck))
- client1.expectMsg(connAckBytes)
+ val serverConnection1 =
+ Source
+ .queue[Command[Nothing]](1, OverflowStrategy.fail)
+ .via(
+ Mqtt
+ .serverSessionFlow(serverSession, ByteString.empty)
+ .join(pipeToClient1))
+ .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ case Right(Event(`connect`, _)) =>
+ connect1Received.success(Done)
+ case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
+ subscribe1Received.success(Done)
+ case Right(Event(`disconnect`, _)) =>
+ disconnectReceived.success(Done)
+ case other => fail(s"didn't match `$other`")
+ })
+ .toMat(Sink.seq)(Keep.left)
+ .run()
- client1Connection.offer(subscribeBytes)
+ val connectBytes = connect.encode(ByteString.newBuilder).result()
+ val connAck = ConnAck(ConnAckFlags.None,
ConnAckReturnCode.ConnectionAccepted)
+ val connAckBytes = connAck.encode(ByteString.newBuilder).result()
- subscribe1Received.future.futureValue shouldBe Done
+ val subscribeBytes = subscribe.encode(ByteString.newBuilder,
PacketId(1)).result()
+ val subAck = SubAck(PacketId(1),
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+ val subAckBytes = subAck.encode(ByteString.newBuilder).result()
- serverConnection1.offer(Command(subAck))
- client1.expectMsg(subAckBytes)
+ val publish = Publish("some-topic", ByteString("some-payload"))
+ val publishBytes = publish.encode(ByteString.newBuilder,
Some(PacketId(1))).result()
+ val dupPublishBytes = publish
+ .copy(flags = publish.flags | ControlPacketFlags.DUP)
+ .encode(ByteString.newBuilder, Some(PacketId(1)))
+ .result()
+ val pubAck = PubAck(PacketId(1))
+ val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
- serverSession ! Command(publish)
- client1.expectMsg(publishBytes)
+ val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()
- // Perform an explicit disconnect otherwise, if for example, we
- // just completed the client connection, the session may receive
- // the associated ConnectionLost signal for the new connection
- // given that the new connection occurs so quickly.
- client1Connection.offer(disconnectBytes)
+ client1Connection.offer(connectBytes)
- disconnectReceived.future.futureValue shouldBe Done
+ connect1Received.future.futureValue shouldBe Done
- val serverConnection2 =
- Source
- .queue[Command[Nothing]](1, OverflowStrategy.fail)
- .via(
- Mqtt
- .serverSessionFlow(serverSession, ByteString.empty)
- .join(pipeToClient2))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
- case Right(Event(`connect`, _)) =>
- connect2Received.success(Done)
- case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
- subscribe2Received.success(Done)
- case Right(Event(_: PubAck, _)) =>
- pubAckReceived.success(Done)
- case other => fail(s"didn't match `$other`")
- })
- .toMat(Sink.seq)(Keep.left)
- .run()
+ serverConnection1.offer(Command(connAck))
+ client1.expectMsg(connAckBytes)
- client2Connection.offer(connectBytes)
+ client1Connection.offer(subscribeBytes)
- connect2Received.future.futureValue shouldBe Done
+ subscribe1Received.future.futureValue shouldBe Done
- serverConnection2.offer(Command(connAck))
- client2.expectMsg(6.seconds, connAckBytes)
+ serverConnection1.offer(Command(subAck))
+ client1.expectMsg(subAckBytes)
- client2Connection.offer(subscribeBytes)
+ serverSession ! Command(publish)
+ client1.expectMsg(publishBytes)
- subscribe2Received.future.futureValue shouldBe Done
+ // Perform an explicit disconnect otherwise, if for example, we
+ // just completed the client connection, the session may receive
+ // the associated ConnectionLost signal for the new connection
+ // given that the new connection occurs so quickly.
+ client1Connection.offer(disconnectBytes)
- serverConnection2.offer(Command(subAck))
+ disconnectReceived.future.futureValue shouldBe Done
- client2.fishForMessage(3.seconds.dilated) {
- case msg: ByteString if msg == dupPublishBytes => true
- case _ => false
- }
+ val serverConnection2 =
+ Source
+ .queue[Command[Nothing]](1, OverflowStrategy.fail)
+ .via(
+ Mqtt
+ .serverSessionFlow(serverSession, ByteString.empty)
+ .join(pipeToClient2))
+ .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ case Right(Event(`connect`, _)) =>
+ connect2Received.success(Done)
+ case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
+ subscribe2Received.success(Done)
+ case Right(Event(_: PubAck, _)) =>
+ pubAckReceived.success(Done)
+ case other => fail(s"didn't match `$other`")
+ })
+ .toMat(Sink.seq)(Keep.left)
+ .run()
+
+ client2Connection.offer(connectBytes)
+
+ connect2Received.future.futureValue shouldBe Done
+
+ serverConnection2.offer(Command(connAck))
+ client2.expectMsg(6.seconds, connAckBytes)
+
+ client2Connection.offer(subscribeBytes)
+
+ subscribe2Received.future.futureValue shouldBe Done
+
+ serverConnection2.offer(Command(subAck))
+
+ client2.fishForMessage(3.seconds.dilated) {
+ case msg: ByteString if msg == dupPublishBytes => true
+ case _ => false
+ }
- client2Connection.offer(pubAckBytes)
- pubAckReceived.future.futureValue shouldBe Done
+ client2Connection.offer(pubAckBytes)
+ pubAckReceived.future.futureValue shouldBe Done
- client1Connection.complete()
- client2Connection.complete()
- serverConnection1.complete()
- serverConnection2.complete()
+ client1Connection.complete()
+ client2Connection.complete()
+ serverConnection1.complete()
+ serverConnection2.complete()
- for {
- _ <- client1Connection.watchCompletion()
- _ <- client2Connection.watchCompletion()
- _ <- serverConnection1.watchCompletion()
- _ <- serverConnection2.watchCompletion()
- } serverSession.shutdown()
+ for {
+ _ <- client1Connection.watchCompletion()
+ _ <- client2Connection.watchCompletion()
+ _ <- serverConnection1.watchCompletion()
+ _ <- serverConnection2.watchCompletion()
+ } serverSession.shutdown()
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]