This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new a4ecf0ab4 retry mqtt stream test that fails a lot (#196)
a4ecf0ab4 is described below

commit a4ecf0ab431c048cea7053e88688c438db5fdfc4
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Aug 21 11:58:03 2023 +0100

    retry mqtt stream test that fails a lot (#196)
    
    * increase mqtt test timeout
    
    * Update MqttSessionSpec.scala
---
 .../test/scala/docs/scaladsl/MqttSessionSpec.scala | 252 +++++++++++----------
 1 file changed, 129 insertions(+), 123 deletions(-)

diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala 
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
index 854768115..237f301fa 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
@@ -31,8 +31,8 @@ import pekko.stream.OverflowStrategy
 import pekko.testkit._
 import pekko.util.{ ByteString, Timeout }
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.time.{ Millis, Span }
+import org.scalatest.concurrent.{ Eventually, ScalaFutures }
+import org.scalatest.time.{ Millis, Minutes, Span }
 
 import scala.concurrent.{ ExecutionContext, Promise }
 import scala.concurrent.duration._
@@ -40,12 +40,15 @@ import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpecLike
 import org.slf4j.LoggerFactory
 
+import scala.util.Right
+
 class MqttSessionSpec
     extends TestKit(ActorSystem("mqtt-spec"))
     with AnyWordSpecLike
     with BeforeAndAfterAll
     with ScalaFutures
     with Matchers
+    with Eventually
     with LogCapturing {
 
   val log = LoggerFactory.getLogger(classOf[MqttSessionSpec])
@@ -1919,156 +1922,159 @@ class MqttSessionSpec
       // longer patience needed since Akka 2.6
       implicit val patienceConfig: PatienceConfig = 
PatienceConfig(scaled(1.second), scaled(50.millis))
 
-      val serverSession = 
ActorMqttServerSession(settings.withProducerPubAckRecTimeout(10.millis))
-
-      val client1 = TestProbe()
-      val toClient1 = Sink.foreach[ByteString](bytes => client1.ref ! bytes)
-      val (client1Connection, fromClient1) = Source
-        .queue[ByteString](1, OverflowStrategy.dropHead)
-        .toMat(BroadcastHub.sink)(Keep.both)
-        .run()
-
-      val pipeToClient1 = Flow.fromSinkAndSource(toClient1, fromClient1)
-
-      val client2 = TestProbe()
-      val toClient2 = Sink.foreach[ByteString](bytes => client2.ref ! bytes)
-      val (client2Connection, fromClient2) = Source
-        .queue[ByteString](0, OverflowStrategy.dropHead)
-        .toMat(BroadcastHub.sink)(Keep.both)
-        .run()
-
-      val pipeToClient2 = Flow.fromSinkAndSource(toClient2, fromClient2)
+      // https://github.com/apache/incubator-pekko-connectors/issues/148
+      eventually(timeout(Span(1, Minutes))) {
+        val serverSession = 
ActorMqttServerSession(settings.withProducerPubAckRecTimeout(10.millis))
 
-      val clientId = "some-client-id"
-
-      val connect = Connect(clientId, ConnectFlags.None)
-      val connect1Received = Promise[Done]()
-      val connect2Received = Promise[Done]()
-
-      val subscribe = Subscribe("some-topic")
-      val subscribe1Received = Promise[Done]()
-      val subscribe2Received = Promise[Done]()
-
-      val pubAckReceived = Promise[Done]()
+        val client1 = TestProbe()
+        val toClient1 = Sink.foreach[ByteString](bytes => client1.ref ! bytes)
+        val (client1Connection, fromClient1) = Source
+          .queue[ByteString](1, OverflowStrategy.dropHead)
+          .toMat(BroadcastHub.sink)(Keep.both)
+          .run()
 
-      val disconnect = Disconnect
-      val disconnectReceived = Promise[Done]()
+        val pipeToClient1 = Flow.fromSinkAndSource(toClient1, fromClient1)
 
-      val serverConnection1 =
-        Source
-          .queue[Command[Nothing]](1, OverflowStrategy.fail)
-          .via(
-            Mqtt
-              .serverSessionFlow(serverSession, ByteString.empty)
-              .join(pipeToClient1))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
-            case Right(Event(`connect`, _)) =>
-              connect1Received.success(Done)
-            case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
-              subscribe1Received.success(Done)
-            case Right(Event(`disconnect`, _)) =>
-              disconnectReceived.success(Done)
-            case other => fail(s"didn't match `$other`")
-          })
-          .toMat(Sink.seq)(Keep.left)
+        val client2 = TestProbe()
+        val toClient2 = Sink.foreach[ByteString](bytes => client2.ref ! bytes)
+        val (client2Connection, fromClient2) = Source
+          .queue[ByteString](0, OverflowStrategy.dropHead)
+          .toMat(BroadcastHub.sink)(Keep.both)
           .run()
 
-      val connectBytes = connect.encode(ByteString.newBuilder).result()
-      val connAck = ConnAck(ConnAckFlags.None, 
ConnAckReturnCode.ConnectionAccepted)
-      val connAckBytes = connAck.encode(ByteString.newBuilder).result()
+        val pipeToClient2 = Flow.fromSinkAndSource(toClient2, fromClient2)
 
-      val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
-      val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
-      val subAckBytes = subAck.encode(ByteString.newBuilder).result()
+        val clientId = "some-client-id"
 
-      val publish = Publish("some-topic", ByteString("some-payload"))
-      val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
-      val dupPublishBytes = publish
-        .copy(flags = publish.flags | ControlPacketFlags.DUP)
-        .encode(ByteString.newBuilder, Some(PacketId(1)))
-        .result()
-      val pubAck = PubAck(PacketId(1))
-      val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
+        val connect = Connect(clientId, ConnectFlags.None)
+        val connect1Received = Promise[Done]()
+        val connect2Received = Promise[Done]()
 
-      val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()
+        val subscribe = Subscribe("some-topic")
+        val subscribe1Received = Promise[Done]()
+        val subscribe2Received = Promise[Done]()
 
-      client1Connection.offer(connectBytes)
+        val pubAckReceived = Promise[Done]()
 
-      connect1Received.future.futureValue shouldBe Done
+        val disconnect = Disconnect
+        val disconnectReceived = Promise[Done]()
 
-      serverConnection1.offer(Command(connAck))
-      client1.expectMsg(connAckBytes)
+        val serverConnection1 =
+          Source
+            .queue[Command[Nothing]](1, OverflowStrategy.fail)
+            .via(
+              Mqtt
+                .serverSessionFlow(serverSession, ByteString.empty)
+                .join(pipeToClient1))
+            .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+              case Right(Event(`connect`, _)) =>
+                connect1Received.success(Done)
+              case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
+                subscribe1Received.success(Done)
+              case Right(Event(`disconnect`, _)) =>
+                disconnectReceived.success(Done)
+              case other => fail(s"didn't match `$other`")
+            })
+            .toMat(Sink.seq)(Keep.left)
+            .run()
 
-      client1Connection.offer(subscribeBytes)
+        val connectBytes = connect.encode(ByteString.newBuilder).result()
+        val connAck = ConnAck(ConnAckFlags.None, 
ConnAckReturnCode.ConnectionAccepted)
+        val connAckBytes = connAck.encode(ByteString.newBuilder).result()
 
-      subscribe1Received.future.futureValue shouldBe Done
+        val subscribeBytes = subscribe.encode(ByteString.newBuilder, 
PacketId(1)).result()
+        val subAck = SubAck(PacketId(1), 
List(ControlPacketFlags.QoSAtLeastOnceDelivery))
+        val subAckBytes = subAck.encode(ByteString.newBuilder).result()
 
-      serverConnection1.offer(Command(subAck))
-      client1.expectMsg(subAckBytes)
+        val publish = Publish("some-topic", ByteString("some-payload"))
+        val publishBytes = publish.encode(ByteString.newBuilder, 
Some(PacketId(1))).result()
+        val dupPublishBytes = publish
+          .copy(flags = publish.flags | ControlPacketFlags.DUP)
+          .encode(ByteString.newBuilder, Some(PacketId(1)))
+          .result()
+        val pubAck = PubAck(PacketId(1))
+        val pubAckBytes = pubAck.encode(ByteString.newBuilder).result()
 
-      serverSession ! Command(publish)
-      client1.expectMsg(publishBytes)
+        val disconnectBytes = disconnect.encode(ByteString.newBuilder).result()
 
-      // Perform an explicit disconnect otherwise, if for example, we
-      // just completed the client connection, the session may receive
-      // the associated ConnectionLost signal for the new connection
-      // given that the new connection occurs so quickly.
-      client1Connection.offer(disconnectBytes)
+        client1Connection.offer(connectBytes)
 
-      disconnectReceived.future.futureValue shouldBe Done
+        connect1Received.future.futureValue shouldBe Done
 
-      val serverConnection2 =
-        Source
-          .queue[Command[Nothing]](1, OverflowStrategy.fail)
-          .via(
-            Mqtt
-              .serverSessionFlow(serverSession, ByteString.empty)
-              .join(pipeToClient2))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
-            case Right(Event(`connect`, _)) =>
-              connect2Received.success(Done)
-            case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
-              subscribe2Received.success(Done)
-            case Right(Event(_: PubAck, _)) =>
-              pubAckReceived.success(Done)
-            case other => fail(s"didn't match `$other`")
-          })
-          .toMat(Sink.seq)(Keep.left)
-          .run()
+        serverConnection1.offer(Command(connAck))
+        client1.expectMsg(connAckBytes)
 
-      client2Connection.offer(connectBytes)
+        client1Connection.offer(subscribeBytes)
 
-      connect2Received.future.futureValue shouldBe Done
+        subscribe1Received.future.futureValue shouldBe Done
 
-      serverConnection2.offer(Command(connAck))
-      client2.expectMsg(6.seconds, connAckBytes)
+        serverConnection1.offer(Command(subAck))
+        client1.expectMsg(subAckBytes)
 
-      client2Connection.offer(subscribeBytes)
+        serverSession ! Command(publish)
+        client1.expectMsg(publishBytes)
 
-      subscribe2Received.future.futureValue shouldBe Done
+        // Perform an explicit disconnect otherwise, if for example, we
+        // just completed the client connection, the session may receive
+        // the associated ConnectionLost signal for the new connection
+        // given that the new connection occurs so quickly.
+        client1Connection.offer(disconnectBytes)
 
-      serverConnection2.offer(Command(subAck))
+        disconnectReceived.future.futureValue shouldBe Done
 
-      client2.fishForMessage(3.seconds.dilated) {
-        case msg: ByteString if msg == dupPublishBytes => true
-        case _                                         => false
-      }
+        val serverConnection2 =
+          Source
+            .queue[Command[Nothing]](1, OverflowStrategy.fail)
+            .via(
+              Mqtt
+                .serverSessionFlow(serverSession, ByteString.empty)
+                .join(pipeToClient2))
+            .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+              case Right(Event(`connect`, _)) =>
+                connect2Received.success(Done)
+              case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
+                subscribe2Received.success(Done)
+              case Right(Event(_: PubAck, _)) =>
+                pubAckReceived.success(Done)
+              case other => fail(s"didn't match `$other`")
+            })
+            .toMat(Sink.seq)(Keep.left)
+            .run()
+
+        client2Connection.offer(connectBytes)
+
+        connect2Received.future.futureValue shouldBe Done
+
+        serverConnection2.offer(Command(connAck))
+        client2.expectMsg(6.seconds, connAckBytes)
+
+        client2Connection.offer(subscribeBytes)
+
+        subscribe2Received.future.futureValue shouldBe Done
+
+        serverConnection2.offer(Command(subAck))
+
+        client2.fishForMessage(3.seconds.dilated) {
+          case msg: ByteString if msg == dupPublishBytes => true
+          case _                                         => false
+        }
 
-      client2Connection.offer(pubAckBytes)
-      pubAckReceived.future.futureValue shouldBe Done
+        client2Connection.offer(pubAckBytes)
+        pubAckReceived.future.futureValue shouldBe Done
 
-      client1Connection.complete()
-      client2Connection.complete()
-      serverConnection1.complete()
-      serverConnection2.complete()
+        client1Connection.complete()
+        client2Connection.complete()
+        serverConnection1.complete()
+        serverConnection2.complete()
 
-      for {
-        _ <- client1Connection.watchCompletion()
-        _ <- client2Connection.watchCompletion()
-        _ <- serverConnection1.watchCompletion()
-        _ <- serverConnection2.watchCompletion()
-      } serverSession.shutdown()
+        for {
+          _ <- client1Connection.watchCompletion()
+          _ <- client2Connection.watchCompletion()
+          _ <- serverConnection1.watchCompletion()
+          _ <- serverConnection2.watchCompletion()
+        } serverSession.shutdown()
 
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to