This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new c374df2c5 Add MQTT v5 support (#1035)
c374df2c5 is described below
commit c374df2c53177a07ef7b763ee071eb3c961f2ab3
Author: Angel Sanadinov <[email protected]>
AuthorDate: Wed Apr 23 20:07:25 2025 +0200
Add MQTT v5 support (#1035)
* MQTT: Update to latest mosquitto image for mqtt tests
* MQTTv5: Add new mqtt connector with support for the v5 protocol
* MQTTv5 Skip mima check for new submodule
* MQTT: Update mqtt and mqttv5 docs to mention which versions are supported
* MQTTv5: Improve handling of null values coming from paho
---
.github/autolabeler.yml | 1 +
.github/workflows/check-build-test.yml | 1 +
build.sbt | 6 +-
docker-compose.yml | 4 +-
docs/src/main/paradox/index.md | 1 +
docs/src/main/paradox/mqtt.md | 2 +
docs/src/main/paradox/{mqtt.md => mqttv5.md} | 72 +--
mqtt/src/test/travis/acl | 27 ++
mqtt/src/test/travis/auth.conf | 2 -
mqtt/src/test/travis/mosquitto.conf | 13 +
.../connectors/mqttv5/impl/MqttFlowStage.scala | 518 +++++++++++++++++++++
.../mqttv5/impl/MqttFlowStageWithAck.scala | 102 ++++
.../connectors/mqttv5/javadsl/MqttFlow.scala | 84 ++++
.../mqttv5/javadsl/MqttMessageWithAck.scala | 55 +++
.../connectors/mqttv5/javadsl/MqttSink.scala | 45 ++
.../connectors/mqttv5/javadsl/MqttSource.scala | 63 +++
.../pekko/stream/connectors/mqttv5/model.scala | 72 +++
.../connectors/mqttv5/scaladsl/MqttFlow.scala | 96 ++++
.../mqttv5/scaladsl/MqttMessageWithAck.scala | 59 +++
.../connectors/mqttv5/scaladsl/MqttSink.scala | 45 ++
.../connectors/mqttv5/scaladsl/MqttSource.scala | 60 +++
.../pekko/stream/connectors/mqttv5/settings.scala | 516 ++++++++++++++++++++
.../src/test/java/docs/javadsl/MqttFlowTest.java | 156 +++++++
.../src/test/java/docs/javadsl/MqttSourceTest.java | 341 ++++++++++++++
mqttv5/src/test/resources/application.conf | 7 +
mqttv5/src/test/resources/logback-test.xml | 29 ++
.../test/scala/docs/scaladsl/MqttFlowSpec.scala | 99 ++++
.../test/scala/docs/scaladsl/MqttSinkSpec.scala | 168 +++++++
.../test/scala/docs/scaladsl/MqttSourceSpec.scala | 495 ++++++++++++++++++++
.../test/scala/docs/scaladsl/MqttSpecBase.scala | 49 ++
project/Dependencies.scala | 4 +
project/project-info.conf | 11 +
32 files changed, 3163 insertions(+), 40 deletions(-)
diff --git a/.github/autolabeler.yml b/.github/autolabeler.yml
index f1f644ab6..b87349761 100644
--- a/.github/autolabeler.yml
+++ b/.github/autolabeler.yml
@@ -37,6 +37,7 @@ dependency-change: "/project/Dependencies.scala"
'p:kudu': ["/kudu"]
'p:mongodb': ["/mongodb"]
'p:mqtt': ["/mqtt"]
+'p:mqttv5': ["/mqttv5"]
'p:mqtt-streaming': ["/mqtt-streaming", "/mqtt-streaming-bench"]
'p:orientdb': ["/orientdb"]
'p:pravega': ["/pravega"]
diff --git a/.github/workflows/check-build-test.yml
b/.github/workflows/check-build-test.yml
index c03158fae..4a988cb55 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -129,6 +129,7 @@ jobs:
- { connector: kudu, pre_cmd: 'docker
compose up -d kudu-master kudu-tserver' }
- { connector: mongodb, pre_cmd: 'docker
compose up -d mongo' }
- { connector: mqtt, pre_cmd: 'docker
compose up -d mqtt' }
+ - { connector: mqttv5, pre_cmd: 'docker
compose up -d mqtt' }
- { connector: mqtt-streaming, pre_cmd: 'docker
compose up -d mqtt' }
- { connector: orientdb, pre_cmd: 'docker
compose up -d orientdb' }
- { connector: pravega, pre_cmd: 'docker
compose up -d pravega'}
diff --git a/build.sbt b/build.sbt
index 8d34f9abb..2ee321b5f 100644
--- a/build.sbt
+++ b/build.sbt
@@ -50,6 +50,7 @@ lazy val userProjects: Seq[ProjectReference] =
List[ProjectReference](
kudu,
mongodb,
mqtt,
+ mqttv5,
mqttStreaming,
orientdb,
pravega,
@@ -304,6 +305,8 @@ lazy val mongodb = pekkoConnectorProject("mongodb",
"mongodb", Dependencies.Mong
lazy val mqtt = pekkoConnectorProject("mqtt", "mqtt", Dependencies.Mqtt)
+lazy val mqttv5 = pekkoConnectorProject("mqttv5", "mqttv5",
Dependencies.MqttV5)
+
lazy val mqttStreaming =
pekkoConnectorProject("mqtt-streaming", "mqttStreaming",
Dependencies.MqttStreaming,
MetaInfLicenseNoticeCopy.mqttStreamingSettings)
@@ -436,6 +439,7 @@ lazy val docs = project
"scaladoc.spray.json.base_url" ->
s"https://javadoc.io/doc/io.spray/spray-json_${scalaBinaryVersion.value}/latest/",
// Eclipse Paho client for MQTT
"javadoc.org.eclipse.paho.client.mqttv3.base_url" ->
"https://www.eclipse.org/paho/files/javadoc/",
+ "javadoc.org.eclipse.paho.mqttv5.client.base_url" ->
"https://www.eclipse.org/paho/files/javadoc/",
"javadoc.org.bson.codecs.configuration.base_url" ->
"https://mongodb.github.io/mongo-java-driver/3.7/javadoc/",
"scaladoc.scala.base_url" ->
s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
"scaladoc.org.apache.pekko.stream.connectors.base_url" ->
s"/${(Preprocess / siteSubdirName).value}/",
@@ -476,7 +480,7 @@ lazy val billOfMaterials = Project("bill-of-materials",
file("bill-of-materials"
description := s"${description.value} (depending on Scala
${CrossVersion.binaryScalaVersion(scalaVersion.value)})")
val mimaCompareVersion = "1.0.2"
-val noMimaChecks = Set("couchbase3", "jakartams", "aws.api.pekko.http")
+val noMimaChecks = Set("couchbase3", "jakartams", "aws.api.pekko.http",
"mqttv5")
def pekkoConnectorProject(projectId: String,
moduleName: String,
diff --git a/docker-compose.yml b/docker-compose.yml
index d106fbb1e..7ad3fc0cd 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -176,11 +176,11 @@ services:
ports:
- "8086:8086"
mqtt:
- image: toke/mosquitto
+ image: eclipse-mosquitto
ports:
- "1883:1883"
volumes:
- - ./mqtt/src/test/travis:/mqtt/config/conf.d
+ - ./mqtt/src/test/travis:/mosquitto/config
orientdb:
image: orientdb:3.2.27
ports:
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 680ff018c..1579b0a25 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -45,6 +45,7 @@ The [Apache Pekko Connectors
project](https://pekko.apache.org/docs/pekko-connec
* [JMS](jms/index.md)
* [MongoDB](mongodb.md)
* [MQTT](mqtt.md)
+* [MQTTv5](mqttv5.md)
* [MQTT Streaming](mqtt-streaming.md)
* [Opensearch](opensearch.md)
* [OrientDB](orientdb.md)
diff --git a/docs/src/main/paradox/mqtt.md b/docs/src/main/paradox/mqtt.md
index 3e70a5754..20254ef83 100644
--- a/docs/src/main/paradox/mqtt.md
+++ b/docs/src/main/paradox/mqtt.md
@@ -6,6 +6,8 @@ MQTT stands for MQ Telemetry Transport. It is a
publish/subscribe, extremely sim
Further information on [mqtt.org](https://mqtt.org/).
+> Note: This connector supports only versions 3.1 and 3.1.1 of the MQTT
protocol; for version 5.0, see @ref[mqttv5](mqttv5.md).
+
@@@
@@@ note { title="Streaming Differences" }
diff --git a/docs/src/main/paradox/mqtt.md b/docs/src/main/paradox/mqttv5.md
similarity index 64%
copy from docs/src/main/paradox/mqtt.md
copy to docs/src/main/paradox/mqttv5.md
index 3e70a5754..640a15ac1 100644
--- a/docs/src/main/paradox/mqtt.md
+++ b/docs/src/main/paradox/mqttv5.md
@@ -1,12 +1,14 @@
-# MQTT
+# MQTT v5
-@@@ note { title="MQTT" }
+@@@ note { title="MQTT v5" }
-MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely
simple and lightweight messaging protocol, designed for constrained devices and
low-bandwidth, high-latency or unreliable networks. The design principles are
to minimise network bandwidth and device resource requirements whilst also
attempting to ensure reliability and some degree of assurance of delivery.
These principles also turn out to make the protocol ideal of the emerging
“machine-to-machine” (M2M) or “In [...]
+MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely
simple and lightweight messaging protocol, designed for constrained devices and
low-bandwidth, high-latency or unreliable networks. The design principles are
to minimise network bandwidth and device resource requirements whilst also
attempting to ensure reliability and some degree of assurance of delivery.
These principles also turn out to make the protocol ideal of the emerging
“machine-to-machine” (M2M) or “In [...]
Further information on [mqtt.org](https://mqtt.org/).
-@@@
+> Note: This connector supports only version 5.0 of the MQTT protocol; for
versions 3.1 and 3.1.1, see @ref[mqtt](mqtt.md).
+
+@@@
@@@ note { title="Streaming Differences" }
@@ -16,13 +18,13 @@ Apache Pekko Connectors contains @ref[another MQTT
connector](mqtt-streaming.md)
The Apache Pekko Connectors MQTT connector provides an Apache Pekko Stream
source, sink and flow to connect to MQTT brokers. It is based on the [Eclipse
Paho Java client](https://www.eclipse.org/paho/clients/java/).
-@@project-info{ projectId="mqtt" }
+@@project-info{ projectId="mqttv5" }
## Artifacts
@@dependency [sbt,Maven,Gradle] {
group=org.apache.pekko
- artifact=pekko-connectors-mqtt_$scala.binary.version$
+ artifact=pekko-connectors-mqttv5_$scala.binary.version$
version=$project.version$
symbol2=PekkoVersion
value2=$pekko.version$
@@ -33,24 +35,24 @@ The Apache Pekko Connectors MQTT connector provides an
Apache Pekko Stream sourc
The table below shows direct dependencies of this module and the second tab
shows all libraries it depends on transitively.
-@@dependencies { projectId="mqtt" }
+@@dependencies { projectId="mqttv5" }
## Settings
-The required `MqttConnectionSettings`
(@scaladoc[API](org.apache.pekko.stream.connectors.mqtt.MqttConnectionSettings$))
settings to connect to an MQTT server are
+The required `MqttConnectionSettings`
(@scaladoc[API](org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings$))
settings to connect to an MQTT server are
1. the MQTT broker address
1. a unique ID for the client (setting it to the empty string should let the
MQTT broker assign it, but not all do; you might want to generate it)
-1. the MQTT client persistence to use (eg.
@javadoc[MemoryPersistence](org.eclipse.paho.client.mqttv3.persist.MemoryPersistence))
which allows to control reliability guarantees
+1. the MQTT client persistence to use (eg.
@javadoc[MemoryPersistence](org.eclipse.paho.mqttv5.client.persist.MemoryPersistence))
which allows to control reliability guarantees
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#create-connection-settings }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#create-connection-settings }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java) {
#create-connection-settings }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java) {
#create-connection-settings }
-Most settings are passed on to Paho's `MqttConnectOptions`
(@javadoc[API](org.eclipse.paho.client.mqttv3.MqttConnectOptions)) and
documented there.
+Most settings are passed on to Paho's `MqttConnectionOptions`
(@javadoc[API](org.eclipse.paho.mqttv5.client.MqttConnectionOptions)) and
documented there.
@@@ warning { title='Use delayed stream restarts' }
Note that the following examples do not provide any connection management and
are designed to get you going quickly. Consider empty client IDs to
auto-generate unique identifiers and the use of @extref:[delayed stream
restarts](pekko:stream/stream-error.html?language=scala#delayed-restarts-with-a-backoff-stage).
The underlying Paho library's auto-reconnect feature [does not handle initial
connections by design](https://github.com/eclipse/paho.mqtt.golang/issues/77).
@@ -62,10 +64,10 @@ Note that the following examples do not provide any
connection management and ar
To connect with transport-level security configure the address as `ssl://`,
set authentication details and pass in a socket factory.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#ssl-settings }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#ssl-settings }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java) {
#ssl-settings }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java) {
#ssl-settings }
## Reading from MQTT
@@ -78,14 +80,14 @@ The `bufferSize` sets the maximum number of messages read
from MQTT before back-
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#create-source }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#create-source }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java) {
#create-source }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java) {
#create-source }
This source has a materialized value
(@scala[@scaladoc[Future[Done]](scala.concurrent.Future)]@java[@javadoc[CompletionStage<Done>](java.util.concurrent.CompletionStage)])
which is completed when the subscription to the MQTT broker has been
established.
-MQTT `atMostOnce` automatically acknowledges messages back to the server when
they are passed downstream.
+MQTT `atMostOnce` automatically acknowledges messages back to the server when
they are passed downstream.
### At least once
@@ -95,39 +97,39 @@ Please note that for manual acks to work `CleanSession`
should be set to false a
The `bufferSize` sets the maximum number of messages read from MQTT before
back-pressure applies.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#create-source-with-manualacks }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#create-source-with-manualacks }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java) {
#create-source-with-manualacks }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java) {
#create-source-with-manualacks }
-The `atLeastOnce` source returns
@scala[@scaladoc[MqttMessageWithAck](org.apache.pekko.stream.connectors.mqtt.scaladsl.MqttMessageWithAck)]@java[@scaladoc[MqttMessageWithAck](org.apache.pekko.stream.connectors.mqtt.javadsl.MqttMessageWithAck)]
so you can acknowledge them by calling `ack()`.
+The `atLeastOnce` source returns
@scala[@scaladoc[MqttMessageWithAck](org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck)]@java[@scaladoc[MqttMessageWithAck](org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAck)]
so you can acknowledge them by calling `ack()`.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#run-source-with-manualacks }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#run-source-with-manualacks }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java) {
#run-source-with-manualacks }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java) {
#run-source-with-manualacks }
## Publishing to MQTT
-To publish messages to the MQTT server create a sink be specifying
`MqttConnectionSettings`
(@scaladoc[API](org.apache.pekko.stream.connectors.mqtt.MqttConnectionSettings$))
and a default Quality of Service-level.
+To publish messages to the MQTT server create a sink be specifying
`MqttConnectionSettings`
(@scaladoc[API](org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings$))
and a default Quality of Service-level.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#run-sink }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#run-sink }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java) {
#run-sink }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java) {
#run-sink }
The Quality of Service-level and the retained flag can be configured on a
per-message basis.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#will-message }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala) {
#will-message }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java) {
#will-message }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java) {
#will-message }
## Publish and subscribe in a single flow
@@ -137,19 +139,19 @@ It is also possible to connect to the MQTT server in
bidirectional fashion, usin
The `bufferSize` sets the maximum number of messages read from MQTT before
back-pressure applies.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#create-flow }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#create-flow }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java) {
#create-flow }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttFlowTest.java) {
#create-flow }
Run the flow by connecting a source of messages to be published and a sink for
received messages.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#run-flow }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#run-flow }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java) {
#run-flow }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttFlowTest.java) {
#run-flow }
## Using flow with Acknowledge on message sent
@@ -161,19 +163,19 @@ This flow can be used when the source must be
acknowledged **only** when the mes
The flow emits `MqttMessageWithAck`s with the message swapped with the new
content and keeps the ack function from the original source.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#create-flow-ack }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#create-flow-ack }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java) {
#create-flow-ack }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttFlowTest.java) {
#create-flow-ack }
Run the flow by connecting a source of messages to be published and a sink for
received messages.
When the message are sent, an ack is called.
Scala
-: @@snip [snip](/mqtt/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#run-flow-ack }
+: @@snip [snip](/mqttv5/src/test/scala/docs/scaladsl/MqttFlowSpec.scala) {
#run-flow-ack }
Java
-: @@snip [snip](/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java) {
#run-flow-ack }
+: @@snip [snip](/mqttv5/src/test/java/docs/javadsl/MqttFlowTest.java) {
#run-flow-ack }
## Capturing MQTT client logging
diff --git a/mqtt/src/test/travis/acl b/mqtt/src/test/travis/acl
index 340bab3d3..8bf95d38d 100644
--- a/mqtt/src/test/travis/acl
+++ b/mqtt/src/test/travis/acl
@@ -17,7 +17,34 @@ topic flow-test/topic-ack
topic typed-flow-spec/topic1
topic untyped-flow-spec/topic1
+topic v5/source-spec/topic1
+topic v5/source-spec/topic2
+topic v5/coffee/level
+topic v5/source-spec/will
+topic v5/source-spec/manualacks
+topic v5/source-spec/pendingacks
+topic v5/sink-spec/topic1
+topic v5/sink-spec/topic2
+topic v5/sink-spec/topic3
+topic v5/sink-spec/topic4
+topic v5/source-test/topic1
+topic v5/source-test/topic2
+topic v5/source-test/will
+topic v5/source-test/manualacks
+topic v5/source-test/pendingacks
+topic v5/flow-spec/topic-ack
+topic v5/flow-test/topic-ack
+topic v5/typed-flow-spec/topic1
+topic v5/untyped-flow-spec/topic1
+
user username1
topic source-spec/secure-topic1
topic source-spec/secure-topic2
topic sink-spec/secure-topic1
+
+topic v5/source-spec/secure-topic1
+topic v5/source-spec/secure-topic2
+topic v5/source-spec/secure-topic3
+topic v5/sink-spec/secure-topic1
+topic v5/sink-spec/secure-topic2
+topic v5/sink-spec/secure-topic3
diff --git a/mqtt/src/test/travis/auth.conf b/mqtt/src/test/travis/auth.conf
deleted file mode 100644
index 1a9c3dcfa..000000000
--- a/mqtt/src/test/travis/auth.conf
+++ /dev/null
@@ -1,2 +0,0 @@
-password_file /mqtt/config/conf.d/password
-acl_file /mqtt/config/conf.d/acl
diff --git a/mqtt/src/test/travis/mosquitto.conf
b/mqtt/src/test/travis/mosquitto.conf
new file mode 100644
index 000000000..2edcd7915
--- /dev/null
+++ b/mqtt/src/test/travis/mosquitto.conf
@@ -0,0 +1,13 @@
+listener 1883
+
+persistence true
+persistence_location /mosquitto/data/
+
+password_file /mosquitto/config/password
+acl_file /mosquitto/config/acl
+
+connection_messages true
+
+allow_anonymous true
+
+log_type debug
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
new file mode 100644
index 000000000..f60baa4a6
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.impl
+
+import java.util.concurrent.Semaphore
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+import scala.util.control.NonFatal
+
+import org.apache.pekko.Done
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.stream.Shape
+import org.apache.pekko.stream._
+import org.apache.pekko.stream.connectors.mqttv5.AuthSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
+import org.apache.pekko.stream.stage._
+import org.apache.pekko.util.ByteString
+import org.eclipse.paho.mqttv5.client.DisconnectedBufferOptions
+import org.eclipse.paho.mqttv5.client.IMqttAsyncClient
+import org.eclipse.paho.mqttv5.client.IMqttToken
+import org.eclipse.paho.mqttv5.client.MqttActionListener
+import org.eclipse.paho.mqttv5.client.MqttAsyncClient
+import org.eclipse.paho.mqttv5.client.MqttCallback
+import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse
+import org.eclipse.paho.mqttv5.common.MqttException
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties
+import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode
+import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[mqttv5] final class MqttFlowStage(
+ connectionSettings: MqttConnectionSettings,
+ subscriptions: Map[String, MqttQoS],
+ bufferSize: Int,
+ defaultQoS: MqttQoS,
+ manualAcks: Boolean = false
+) extends GraphStageWithMaterializedValue[FlowShape[MqttMessage,
MqttMessageWithAck], Future[Done]] {
+
+ private val in = Inlet[MqttMessage]("MqttFlow.in")
+ private val out = Outlet[MqttMessageWithAck]("MqttFlow.out")
+ override val shape: Shape = FlowShape(in, out)
+
+ override protected def initialAttributes: Attributes =
Attributes.name("MqttFlow")
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[Done]) = {
+ val subscriptionPromise = Promise[Done]()
+ val logic = new MqttFlowStageLogic[MqttMessage](
+ in = in,
+ out = out,
+ shape = shape,
+ subscriptionPromise = subscriptionPromise,
+ connectionSettings = connectionSettings,
+ subscriptions = subscriptions,
+ bufferSize = bufferSize,
+ defaultQoS = defaultQoS,
+ manualAcks = manualAcks
+ ) {
+ override def publishPending(msg: MqttMessage): Unit =
super.publishToMqtt(msg)
+ }
+ (logic, subscriptionPromise.future)
+ }
+}
+
+abstract class MqttFlowStageLogic[I](
+ in: Inlet[I],
+ out: Outlet[MqttMessageWithAck],
+ shape: Shape,
+ subscriptionPromise: Promise[Done],
+ connectionSettings: MqttConnectionSettings,
+ subscriptions: Map[String, MqttQoS],
+ bufferSize: Int,
+ defaultQoS: MqttQoS,
+ manualAcks: Boolean
+) extends GraphStageLogic(shape)
+ with StageLogging
+ with InHandler
+ with OutHandler {
+
+ import MqttFlowStageLogic._
+
+ private val backpressurePahoClient = new Semaphore(bufferSize)
+ private var pendingMsg = Option.empty[I]
+ private val queue = mutable.Queue[MqttMessageWithAck]()
+ private val unackedMessages = new AtomicInteger()
+
+ protected def handleDeliveryComplete(token: IMqttToken): Unit = ()
+
+ private val onSubscribe: AsyncCallback[Try[IMqttToken]] =
getAsyncCallback[Try[IMqttToken]] { conn =>
+ if (subscriptionPromise.isCompleted) {
+ log.debug(
+ "Client [{}] re-established subscription to broker [{}]",
+ connectionSettings.clientId,
+ connectionSettings.broker
+ )
+ } else {
+ subscriptionPromise.complete(conn.map(_ => {
+ log.debug(
+ "Client [{}] established subscription to broker [{}]",
+ connectionSettings.clientId,
+ connectionSettings.broker
+ )
+ Done
+ }))
+ pull(in)
+ }
+ }
+
+ private val onConnect: AsyncCallback[IMqttAsyncClient] =
+ getAsyncCallback[IMqttAsyncClient]((client: IMqttAsyncClient) => {
+ if (subscriptions.nonEmpty) {
+ if (manualAcks) client.setManualAcks(true)
+ val (topics, qoses) = subscriptions.unzip
+ log.debug(
+ "Client [{}] connected to broker [{}]; subscribing to [{}]",
+ connectionSettings.clientId,
+ connectionSettings.broker,
+ subscriptions.map(sub =>
s"${sub._1}(qos=${sub._2.value})").mkString(", ")
+ )
+ client.subscribe(
+ topics.toArray,
+ qoses.map(_.value).toArray,
+ /* userContext */ null,
+ /* callback */ new MqttActionListener {
+ def onSuccess(token: IMqttToken): Unit = {
+ Option(token.getReasonCodes).toList.flatMap(_.toList).filter(
+ _ >= MqttReturnCode.RETURN_CODE_UNSPECIFIED_ERROR).distinct
match {
+ case Nil =>
+ onSubscribe.invoke(Success(token))
+
+ case errors =>
+ val message = s"Client [${connectionSettings.clientId}]
received one or more errors " +
+ s"while subscribing to broker
[${connectionSettings.broker}]: " +
+ s"[${errors.map(e =>
s"code=${e.toString}").mkString(",")}]"
+ log.error(message)
+ onSubscribe.invoke(Failure(new RuntimeException(message)))
+ }
+ }
+
+ def onFailure(token: IMqttToken, ex: Throwable): Unit =
+ onSubscribe.invoke(Failure(ex))
+ }
+ )
+ } else {
+ log.debug(
+ "Client [{}] connected to broker [{}] without subscriptions",
+ connectionSettings.clientId,
+ connectionSettings.broker
+ )
+ subscriptionPromise.complete(SuccessfullyDone)
+ pull(in)
+ }
+ })
+
+ private val onConnectionLost: AsyncCallback[Throwable] =
getAsyncCallback[Throwable](failStageWith)
+
+ private val onMessageAsyncCallback: AsyncCallback[MqttMessageWithAck] =
+ getAsyncCallback[MqttMessageWithAck] { message =>
+ if (isAvailable(out)) {
+ pushDownstream(message)
+ } else if (queue.size + 1 > bufferSize) {
+ failStageWith(new RuntimeException(s"Reached maximum buffer size
[$bufferSize]"))
+ } else {
+ queue.enqueue(message)
+ }
+ }
+
+ private val onPublished: AsyncCallback[Try[IMqttToken]] =
getAsyncCallback[Try[IMqttToken]] {
+ case Success(_) => if (!hasBeenPulled(in)) pull(in)
+ case Failure(ex) => failStageWith(ex)
+ }
+
+ private def createPahoBufferOptions(settings:
MqttOfflinePersistenceSettings): DisconnectedBufferOptions = {
+ val disconnectedBufferOptions = new DisconnectedBufferOptions()
+
+ disconnectedBufferOptions.setBufferEnabled(true)
+ disconnectedBufferOptions.setBufferSize(settings.bufferSize)
+
disconnectedBufferOptions.setDeleteOldestMessages(settings.deleteOldestMessage)
+ disconnectedBufferOptions.setPersistBuffer(settings.persistBuffer)
+
+ disconnectedBufferOptions
+ }
+
+ private val client = new MqttAsyncClient(
+ connectionSettings.broker,
+ connectionSettings.clientId,
+ connectionSettings.persistence
+ )
+
+ private def mqttClient: MqttAsyncClient =
connectionSettings.offlinePersistence match {
+ case Some(bufferOpts) =>
+ client.setBufferOpts(createPahoBufferOptions(bufferOpts))
+ client
+
+ case _ =>
+ client
+ }
+
+ private val commitCallback: AsyncCallback[CommitCallbackArguments] =
+ getAsyncCallback[CommitCallbackArguments]((args: CommitCallbackArguments)
=>
+ try {
+ mqttClient.messageArrivedComplete(args.messageId, args.qos.value)
+ if (unackedMessages.decrementAndGet() == 0 && (isClosed(out) ||
(isClosed(in) && queue.isEmpty)))
+ completeStage()
+ args.promise.complete(SuccessfullyDone)
+ } catch {
+ case ex: Throwable => args.promise.failure(ex)
+ }
+ )
+
+ mqttClient.setCallback(
+ new MqttCallback {
+ override def messageArrived(topic: String, pahoMessage:
PahoMqttMessage): Unit = {
+ backpressurePahoClient.acquire()
+ val message = new MqttMessageWithAck {
+ override val message: MqttMessage = MqttMessage(topic,
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+
+ override def ack(): Future[Done] = {
+ val promise = Promise[Done]()
+ val qos = pahoMessage.getQos match {
+ case 0 => MqttQoS.AtMostOnce
+ case 1 => MqttQoS.AtLeastOnce
+ case 2 => MqttQoS.ExactlyOnce
+ }
+ commitCallback.invoke(CommitCallbackArguments(pahoMessage.getId,
qos, promise))
+ promise.future
+ }
+ }
+ onMessageAsyncCallback.invoke(message)
+ }
+
+ override def deliveryComplete(token: IMqttToken): Unit =
+ handleDeliveryComplete(token)
+
+ override def disconnected(disconnectResponse: MqttDisconnectResponse):
Unit = {
+ if (!connectionSettings.automaticReconnect) {
+ log.error(
+ "Client [{}] lost connection to broker [{}] with
[code={},reason={}]; " +
+ "(hint: `automaticReconnect` can be enabled in
`MqttConnectionSettings`)",
+ connectionSettings.clientId,
+ connectionSettings.broker,
+ disconnectResponse.getReturnCode,
+ disconnectResponse.getReasonString
+ )
+ onConnectionLost.invoke(disconnectResponse.getException)
+ } else {
+ log.warning(
+ "Client [{}] lost connection to broker [{}] with
[code={},reason={}]; trying to reconnect...",
+ connectionSettings.clientId,
+ connectionSettings.broker,
+ disconnectResponse.getReturnCode,
+ disconnectResponse.getReasonString
+ )
+ }
+ }
+
+ override def mqttErrorOccurred(exception: MqttException): Unit =
+ failStageWith(exception)
+
+ override def authPacketArrived(reasonCode: Int, properties:
MqttProperties): Unit = {
+ connectionSettings.auth match {
+ case AuthSettings.Enhanced(_, _, _) if reasonCode == 0x00 =>
+ // (re)authentication successful; no further action needed
+ log.debug(
+ "Authentication for client [{}] completed successfully with
[codes={},reason={}]",
+ connectionSettings.clientId,
+ reasonCode,
+ Option(properties).flatMap(props =>
Option(props.getReasonString)).orNull
+ )
+
+ case AuthSettings.Enhanced(_, _, authPacketHandler) if reasonCode ==
0x18 =>
+ // continue authentication
+ log.debug(
+ "Authentication for client [{}] continuing with
[codes={},reason={}]",
+ connectionSettings.clientId,
+ reasonCode,
+ Option(properties).flatMap(props =>
Option(props.getReasonString)).orNull
+ )
+
+ val (responseCode, responseProperties) =
authPacketHandler(reasonCode, properties)
+
+ val result = mqttClient.authenticate(
+ /* reasonCode */ responseCode,
+ /* userContext */ null,
+ /* properties */ responseProperties
+ )
+
+ Option(result).foreach { token =>
+ // the API docs say that a token for the operation is returned
but the current
+ // implementation in
`org.eclipse.paho.mqttv5.client.MqttAsyncClient` actually
+ // returns `null`; in case this changes in the future, here we
log the results
+ // of the operation and hope for the best
+ token.setActionCallback(
+ new MqttActionListener {
+ override def onSuccess(token: IMqttToken): Unit =
+ log.debug(
+ "Authentication call for client [{}] completed with
[codes={},reason={}]",
+ connectionSettings.clientId,
+
Option(token.getReasonCodes).toList.flatMap(_.toList).distinct.sorted.mkString(";"),
+ Option(token.getResponseProperties).flatMap(props =>
Option(props.getReasonString)).orNull
+ )
+
+ override def onFailure(token: IMqttToken, ex: Throwable):
Unit =
+ log.debug(
+ "Authentication call for client [{}] failed with
[codes={},reason={}]: [{}]",
+ connectionSettings.clientId,
+
Option(token.getReasonCodes).toList.flatMap(_.toList).distinct.sorted.mkString(";"),
+ Option(token.getResponseProperties).flatMap(props =>
Option(props.getReasonString)).orNull,
+ s"${ex.getClass.getSimpleName} - ${ex.getMessage}"
+ )
+ }
+ )
+ }
+
+ case other =>
+ // unexpected reason code received (if enhanced authentication is
used)
+ // OR
+ // unexpected AUTH packet received (if no or simple authentication
is used)
+ log.warning(
+ "Client [{}] with authentication type [{}] received an
unexpected AUTH packet with [code={},reason={}]",
+ connectionSettings.clientId,
+ other.getClass.getSimpleName.replaceAll("[^a-zA-Z0-9]",
"").toLowerCase,
+ reasonCode,
+ Option(properties).flatMap(props =>
Option(props.getReasonString)).orNull
+ )
+ }
+ }
+
+ override def connectComplete(reconnect: Boolean, serverURI: String):
Unit = {
+ log.debug(
+ "Connection completed for client [{}] with
[reconnect={},serverURI={}]",
+ connectionSettings.clientId,
+ reconnect,
+ serverURI
+ )
+ pendingMsg.foreach { msg =>
+ log.debug("Client [{}] sending pending message to broker [{}]",
connectionSettings.clientId, serverURI)
+ publishPending(msg)
+ pendingMsg = None
+ }
+ if (reconnect && !hasBeenPulled(in)) pull(in)
+ }
+ }
+ )
+
+ override def onPush(): Unit = {
+ val msg = grab(in)
+ try {
+ publishPending(msg)
+ } catch {
+ case _: MqttException if connectionSettings.automaticReconnect =>
pendingMsg = Some(msg)
+ case NonFatal(e) => throw e
+ }
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ setKeepGoing(true)
+ if (queue.isEmpty && unackedMessages.get() == 0) super.onUpstreamFinish()
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ setKeepGoing(true)
+ if (queue.isEmpty && unackedMessages.get() == 0)
super.onUpstreamFailure(ex)
+ }
+
+ override def onPull(): Unit =
+ if (queue.nonEmpty) {
+ pushDownstream(queue.dequeue())
+ if (unackedMessages.get() == 0 && isClosed(in)) completeStage()
+ }
+
+ override def onDownstreamFinish(cause: Throwable): Unit = {
+ setKeepGoing(true)
+ if (unackedMessages.get() == 0) super.onDownstreamFinish(cause)
+ }
+
+ setHandlers(in, out, this)
+
+ def publishToMqtt(msg: MqttMessage): IMqttToken = {
+ val pahoMsg = new PahoMqttMessage(msg.payload.toArray)
+ pahoMsg.setQos(msg.qos.getOrElse(defaultQoS).value)
+ pahoMsg.setRetained(msg.retained)
+
+ mqttClient.publish(
+ msg.topic,
+ pahoMsg,
+ msg,
+ /* callback */ new MqttActionListener {
+ def onSuccess(token: IMqttToken): Unit = {
+ Option(token.getReasonCodes).toList.flatMap(_.toList).filter(
+ _ >= MqttReturnCode.RETURN_CODE_UNSPECIFIED_ERROR).distinct match {
+ case Nil =>
+ onPublished.invoke(Success(token))
+
+ case errors =>
+ val message = s"Client [${connectionSettings.clientId}] received
one or more error codes " +
+ s"while publishing on topic [${msg.topic}] to broker
[${connectionSettings.broker}]: " +
+ s"[${errors.map(e => s"code=${e.toString}").mkString(",")}]"
+ log.error(message)
+ onPublished.invoke(Failure(new RuntimeException(message)))
+ }
+ }
+
+ def onFailure(token: IMqttToken, ex: Throwable): Unit =
+ onPublished.invoke(Failure(ex))
+ }
+ )
+ }
+
+ def publishPending(msg: I): Unit = ()
+
+ private def pushDownstream(message: MqttMessageWithAck): Unit = {
+ push(out, message)
+ backpressurePahoClient.release()
+ if (manualAcks) unackedMessages.incrementAndGet()
+ }
+
+ private def failStageWith(ex: Throwable): Unit = {
+ subscriptionPromise.tryFailure(ex)
+ failStage(ex)
+
+ }
+
+ override def preStart(): Unit =
+ try {
+ mqttClient.connect(
+ connectionSettings.asMqttConnectionOptions(),
+ /* userContext */ null,
+ /* callback */ new MqttActionListener {
+ override def onSuccess(v: IMqttToken): Unit =
onConnect.invoke(mqttClient)
+ override def onFailure(asyncActionToken: IMqttToken, ex: Throwable):
Unit = onConnectionLost.invoke(ex)
+ }
+ )
+ } catch {
+ case e: Throwable => failStageWith(e)
+ }
+
+ override def postStop(): Unit = {
+ if (!subscriptionPromise.isCompleted) {
+ subscriptionPromise
+ .tryFailure(
+ new IllegalStateException(
+ "Cannot complete subscription because the stage is about to stop
or fail"
+ )
+ )
+ }
+
+ try {
+ log.debug(
+ "Stage stopped, disconnecting client [{}] from broker [{}]",
+ connectionSettings.clientId,
+ connectionSettings.broker
+ )
+
+ mqttClient.disconnect(
+ /* quiesceTimeout */
connectionSettings.disconnect.quiesceTimeout.toMillis,
+ /* userContext */ null,
+ /* callback */ new MqttActionListener {
+ override def onSuccess(asyncActionToken: IMqttToken): Unit =
+ mqttClient.close()
+
+ override def onFailure(asyncActionToken: IMqttToken, ex: Throwable):
Unit = {
+ mqttClient.disconnectForcibly(
+ /* quiesceTimeout */ 0L, // we already quiesced in `disconnect`
+ /* disconnectTimeout */
connectionSettings.disconnect.timeout.toMillis,
+ /* sendDisconnectPacket */
connectionSettings.disconnect.sendDisconnectPacket
+ )
+
+ mqttClient.close()
+ }
+ },
+ /* reasonCode */ MqttReturnCode.RETURN_CODE_SUCCESS,
+ /* disconnectProperties */ new MqttProperties()
+ )
+ } catch {
+ // disconnect is "best effort"; ignore if it fails
+ case _: MqttException =>
+ try {
+ mqttClient.close()
+ } catch {
+ case _: MqttException => () // do nothing
+ }
+ }
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[mqttv5] object MqttFlowStageLogic {
+ private val SuccessfullyDone = Success(Done)
+
+ final private case class CommitCallbackArguments(messageId: Int, qos:
MqttQoS, promise: Promise[Done])
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStageWithAck.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStageWithAck.scala
new file mode 100644
index 000000000..cc94e056a
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStageWithAck.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.impl
+
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.Promise
+
+import org.apache.pekko.Done
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.stream._
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
+import org.apache.pekko.stream.stage._
+import org.eclipse.paho.mqttv5.client.IMqttToken
+
+/**
+ * INTERNAL API
+ */
+
+@InternalApi
+private[mqttv5] final class MqttFlowStageWithAck(connectionSettings:
MqttConnectionSettings,
+ subscriptions: Map[String, MqttQoS],
+ bufferSize: Int,
+ defaultQoS: MqttQoS,
+ manualAcks: Boolean = false)
+ extends GraphStageWithMaterializedValue[FlowShape[MqttMessageWithAck,
MqttMessageWithAck], Future[Done]] {
+
+ private val in = Inlet[MqttMessageWithAck]("MqttFlow.in")
+ private val out = Outlet[MqttMessageWithAck]("MqttFlow.out")
+ override val shape: Shape = FlowShape(in, out)
+
+ override protected def initialAttributes: Attributes =
Attributes.name("MqttFlow")
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[Done]) = {
+ val subscriptionPromise = Promise[Done]()
+
+ val logic = new MqttFlowWithAckStageLogic(
+ in = in,
+ out = out,
+ shape = shape,
+ subscriptionPromise = subscriptionPromise,
+ connectionSettings = connectionSettings,
+ subscriptions = subscriptions,
+ bufferSize = bufferSize,
+ defaultQoS = defaultQoS,
+ manualAcks = manualAcks
+ )
+ (logic, subscriptionPromise.future)
+ }
+
+}
+
+class MqttFlowWithAckStageLogic(
+ in: Inlet[MqttMessageWithAck],
+ out: Outlet[MqttMessageWithAck],
+ shape: Shape,
+ subscriptionPromise: Promise[Done],
+ connectionSettings: MqttConnectionSettings,
+ subscriptions: Map[String, MqttQoS],
+ bufferSize: Int,
+ defaultQoS: MqttQoS,
+ manualAcks: Boolean
+) extends MqttFlowStageLogic[MqttMessageWithAck](
+ in = in,
+ out = out,
+ shape = shape,
+ subscriptionPromise = subscriptionPromise,
+ connectionSettings = connectionSettings,
+ subscriptions = subscriptions,
+ bufferSize = bufferSize,
+ defaultQoS = defaultQoS,
+ manualAcks = manualAcks
+ ) {
+
+ private val messagesToAck: mutable.HashMap[Int, MqttMessageWithAck] =
mutable.HashMap()
+
+ override def handleDeliveryComplete(token: IMqttToken): Unit = {
+ if (messagesToAck.isDefinedAt(token.getMessageId)) {
+ messagesToAck(token.getMessageId).ack()
+ messagesToAck.remove(token.getMessageId)
+ }
+ }
+
+ override def publishPending(msg: MqttMessageWithAck): Unit = {
+ val publish = publishToMqtt(msg.message)
+ messagesToAck ++= mutable.HashMap(publish.getMessageId -> msg)
+ }
+
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttFlow.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttFlow.scala
new file mode 100644
index 000000000..aa4635ebf
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttFlow.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.javadsl
+
+import java.util.concurrent.CompletionStage
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5._
+import org.apache.pekko.stream.javadsl.Flow
+import org.apache.pekko.util.FutureConverters._
+
+/**
+ * Java API
+ *
+ * MQTT flow factory.
+ */
+object MqttFlow {
+
+ /**
+ * Create a flow to send messages to MQTT AND subscribe to MQTT messages
(without a commit handle).
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def atMostOnce(settings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int,
+ defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage,
CompletionStage[Done]] =
+ scaladsl.MqttFlow
+ .atMostOnce(settings, subscriptions, bufferSize, defaultQos)
+ .mapMaterializedValue(c => c.asJava)
+ .asJava
+
+ /**
+ * Create a flow to send messages to MQTT AND subscribe to MQTT messages
with a commit handle to acknowledge message reception.
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def atLeastOnce(
+ settings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int,
+ defaultQos: MqttQoS): Flow[MqttMessage, MqttMessageWithAck,
CompletionStage[Done]] =
+ scaladsl.MqttFlow
+ .atLeastOnce(settings, subscriptions, bufferSize, defaultQos)
+ .map(MqttMessageWithAck.toJava)
+ .mapMaterializedValue(_.asJava)
+ .asJava
+
+ /**
+ * Create a flow to send messages to MQTT , send acknowledge AND subscribe
to MQTT messages with a commit handle to acknowledge message reception.
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def atLeastOnceWithAck(
+ settings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int,
+ defaultQos: MqttQoS): Flow[MqttMessageWithAck, MqttMessageWithAck,
CompletionStage[Done]] =
+ scaladsl.MqttFlow
+ .atLeastOnceWithAckForJava(settings, subscriptions, bufferSize,
defaultQos)
+ .map(MqttMessageWithAck.toJava)
+ .mapMaterializedValue(_.asJava)
+ .asJava
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttMessageWithAck.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttMessageWithAck.scala
new file mode 100644
index 000000000..8559b1329
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttMessageWithAck.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.javadsl
+
+import java.util.concurrent.CompletionStage
+
+import org.apache.pekko.Done
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl
+import org.apache.pekko.util.FutureConverters._
+
+/**
+ * Java API
+ *
+ * MQTT Message and a handle to acknowledge message reception to MQTT.
+ */
+sealed trait MqttMessageWithAck {
+
+ /**
+ * The message received from MQTT.
+ */
+ val message: MqttMessage
+
+ /**
+ * Signals `messageArrivedComplete` to MQTT.
+ *
+ * @return completion indicating, if the acknowledge reached MQTT
+ */
+ def ack(): CompletionStage[Done]
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[javadsl] object MqttMessageWithAck {
+ def toJava(cm: scaladsl.MqttMessageWithAck): MqttMessageWithAck = new
MqttMessageWithAck {
+ override val message: MqttMessage = cm.message
+ override def ack(): CompletionStage[Done] = cm.ack().asJava
+ }
+}
+
+abstract class MqttMessageWithAckImpl extends MqttMessageWithAck
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttSink.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttSink.scala
new file mode 100644
index 000000000..f849d3b58
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttSink.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.javadsl
+
+import java.util.concurrent.CompletionStage
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.javadsl.Keep
+import org.apache.pekko.stream.javadsl.Sink
+
+/**
+ * Java API
+ *
+ * MQTT sink factory.
+ */
+object MqttSink {
+
+ /**
+ * Create a sink sending messages to MQTT.
+ *
+ * The materialized value completes on stream completion.
+ *
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def create(connectionSettings: MqttConnectionSettings,
+ defaultQos: MqttQoS): Sink[MqttMessage, CompletionStage[Done]] =
+ MqttFlow
+ .atMostOnce(connectionSettings, MqttSubscriptions.empty, bufferSize = 0,
defaultQos)
+ .toMat(Sink.ignore[MqttMessage](), Keep.right[CompletionStage[Done],
CompletionStage[Done]])
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttSource.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttSource.scala
new file mode 100644
index 000000000..9be45fa37
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/javadsl/MqttSource.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.javadsl
+
+import java.util.concurrent.CompletionStage
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl
+import org.apache.pekko.stream.javadsl.Source
+import org.apache.pekko.util.FutureConverters._
+
+/**
+ * Java API
+ *
+ * MQTT source factory.
+ */
+object MqttSource {
+
+ /**
+ * Create a source subscribing to MQTT messages (without a commit handle).
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ */
+ def atMostOnce(settings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int): Source[MqttMessage, CompletionStage[Done]] =
+ scaladsl.MqttSource
+ .atMostOnce(settings, subscriptions, bufferSize)
+ .mapMaterializedValue(_.asJava)
+ .asJava
+
+ /**
+ * Create a source subscribing to MQTT messages with a commit handle to
acknowledge message reception.
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ */
+ def atLeastOnce(settings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int): Source[MqttMessageWithAck, CompletionStage[Done]] =
+ scaladsl.MqttSource
+ .atLeastOnce(settings, subscriptions, bufferSize)
+ .map(MqttMessageWithAck.toJava)
+ .mapMaterializedValue(_.asJava)
+ .asJava
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
new file mode 100644
index 000000000..106488b16
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5
+
+import org.apache.pekko
+
+final class MqttMessage private (
+ val topic: String,
+ val payload: org.apache.pekko.util.ByteString,
+ val qos: Option[MqttQoS],
+ val retained: Boolean
+) {
+
+ def withTopic(value: String): MqttMessage = copy(topic = value)
+ def withPayload(value: pekko.util.ByteString): MqttMessage = copy(payload =
value)
+ def withPayload(value: Array[Byte]): MqttMessage = copy(payload =
pekko.util.ByteString(value))
+ def withQos(value: MqttQoS): MqttMessage = copy(qos = Option(value))
+ def withRetained(value: Boolean): MqttMessage = if (retained == value) this
else copy(retained = value)
+
+ private def copy(topic: String = topic,
+ payload: pekko.util.ByteString = payload,
+ qos: Option[MqttQoS] = qos,
+ retained: Boolean = retained): MqttMessage =
+ new MqttMessage(topic = topic, payload = payload, qos = qos, retained =
retained)
+
+ override def toString =
+
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained)"""
+
+ override def equals(other: Any): Boolean = other match {
+ case that: MqttMessage =>
+ java.util.Objects.equals(this.topic, that.topic) &&
+ java.util.Objects.equals(this.payload, that.payload) &&
+ java.util.Objects.equals(this.qos, that.qos) &&
+ java.util.Objects.equals(this.retained, that.retained)
+ case _ => false
+ }
+
+ override def hashCode(): Int =
+ java.util.Objects.hash(topic, payload, qos, Boolean.box(retained))
+}
+
+object MqttMessage {
+
+ /** Scala API */
+ def apply(
+ topic: String,
+ payload: pekko.util.ByteString): MqttMessage = new MqttMessage(
+ topic,
+ payload,
+ qos = None,
+ retained = false)
+
+ /** Java API */
+ def create(
+ topic: String,
+ payload: pekko.util.ByteString): MqttMessage = new MqttMessage(
+ topic,
+ payload,
+ qos = None,
+ retained = false)
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttFlow.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttFlow.scala
new file mode 100644
index 000000000..e8b10e7d7
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttFlow.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.scaladsl
+
+import scala.concurrent.Future
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.impl.MqttFlowStage
+import org.apache.pekko.stream.connectors.mqttv5.impl.MqttFlowStageWithAck
+import org.apache.pekko.stream.connectors.mqttv5.javadsl
+import org.apache.pekko.stream.scaladsl.Flow
+import org.apache.pekko.stream.scaladsl.Keep
+
+/**
+ * Scala API
+ *
+ * MQTT flow factory.
+ */
+object MqttFlow {
+
+ /**
+ * Create a flow to send messages to MQTT AND subscribe to MQTT messages
(without a commit handle).
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def atMostOnce(connectionSettings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int,
+ defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage, Future[Done]] =
+ Flow
+ .fromGraph(
+ new MqttFlowStage(connectionSettings, subscriptions.subscriptions,
bufferSize, defaultQos))
+ .map(_.message)
+
+ /**
+ * Create a flow to send messages to MQTT AND subscribe to MQTT messages
with a commit handle to acknowledge message reception.
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def atLeastOnce(connectionSettings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int,
+ defaultQos: MqttQoS): Flow[MqttMessage, MqttMessageWithAck,
Future[Done]] =
+ Flow.fromGraph(
+ new MqttFlowStage(connectionSettings, subscriptions.subscriptions,
bufferSize, defaultQos, manualAcks = true))
+
+ /**
+ * Create a flow to send messages to MQTT AND subscribe to MQTT messages
with a commit handle to acknowledge message reception.
+ * The acknowledge are fired in both messages
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def atLeastOnceWithAck(connectionSettings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int,
+ defaultQos: MqttQoS): Flow[MqttMessageWithAck, MqttMessageWithAck,
Future[Done]] =
+ Flow.fromGraph(
+ new MqttFlowStageWithAck(connectionSettings,
+ subscriptions.subscriptions,
+ bufferSize,
+ defaultQos,
+ manualAcks = true))
+
+ def atLeastOnceWithAckForJava(
+ connectionSettings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int,
+ defaultQos: MqttQoS): Flow[javadsl.MqttMessageWithAck,
MqttMessageWithAck, Future[Done]] =
+ Flow
+ .fromFunction(MqttMessageWithAck.fromJava)
+ .viaMat(atLeastOnceWithAck(connectionSettings, subscriptions,
bufferSize, defaultQos))(Keep.right)
+
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttMessageWithAck.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttMessageWithAck.scala
new file mode 100644
index 000000000..cbf2b4398
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttMessageWithAck.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.scaladsl
+
+import scala.concurrent.Future
+
+import org.apache.pekko.Done
+import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.javadsl
+import org.apache.pekko.util.FutureConverters._
+
+/**
+ * Scala API
+ *
+ * MQTT Message and a handle to acknowledge message reception to MQTT.
+ */
+trait MqttMessageWithAck {
+
+ /**
+ * The message received from MQTT.
+ */
+ val message: MqttMessage
+
+ /**
+ * Signals `messageArrivedComplete` to MQTT.
+ *
+ * @return a future indicating, if the acknowledge reached MQTT
+ */
+ def ack(): Future[Done]
+}
+/*
+ * INTERNAL API
+ */
+@InternalApi
+private[scaladsl] object MqttMessageWithAck {
+ def fromJava(e: javadsl.MqttMessageWithAck): MqttMessageWithAck =
+ new MqttMessageWithAck {
+ override val message: MqttMessage = e.message
+
+ /**
+ * Signals `messageArrivedComplete` to MQTT.
+ *
+ * @return a future indicating, if the acknowledge reached MQTT
+ */
+ override def ack(): Future[Done] = e.ack().asScala
+ }
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttSink.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttSink.scala
new file mode 100644
index 000000000..28c158661
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttSink.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.scaladsl
+
+import scala.concurrent.Future
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.Sink
+
+/**
+ * Scala API
+ *
+ * MQTT sink factory.
+ */
+object MqttSink {
+
+ /**
+ * Create a sink sending messages to MQTT.
+ *
+ * The materialized value completes on stream completion.
+ *
+ * @param defaultQos Quality of service level applied for messages not
specifying a message specific value
+ */
+ def apply(connectionSettings: MqttConnectionSettings, defaultQos: MqttQoS):
Sink[MqttMessage, Future[Done]] =
+ MqttFlow
+ .atMostOnce(connectionSettings, MqttSubscriptions.empty, 0, defaultQos)
+ .toMat(Sink.ignore)(Keep.right)
+
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttSource.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttSource.scala
new file mode 100644
index 000000000..9a4a6ee56
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/scaladsl/MqttSource.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5.scaladsl
+
+import scala.concurrent.Future
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.Source
+
+/**
+ * Scala API
+ *
+ * MQTT source factory.
+ */
+object MqttSource {
+
+ /**
+ * Create a source subscribing to MQTT messages (without a commit handle).
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ */
+ def atMostOnce(settings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int): Source[MqttMessage, Future[Done]] =
+ Source.maybe
+ .viaMat(
+ MqttFlow.atMostOnce(settings, subscriptions, bufferSize, defaultQos =
MqttQoS.AtLeastOnce))(Keep.right)
+
+ /**
+ * Create a source subscribing to MQTT messages with a commit handle to
acknowledge message reception.
+ *
+ * The materialized value completes on successful connection to the MQTT
broker.
+ *
+ * @param bufferSize max number of messages read from MQTT before
back-pressure applies
+ */
+ def atLeastOnce(settings: MqttConnectionSettings,
+ subscriptions: MqttSubscriptions,
+ bufferSize: Int): Source[MqttMessageWithAck, Future[Done]] =
+ Source.maybe.viaMat(
+ MqttFlow.atLeastOnce(settings, subscriptions, bufferSize, defaultQos =
MqttQoS.AtLeastOnce))(Keep.right)
+
+}
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/settings.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/settings.scala
new file mode 100644
index 000000000..d05d318b9
--- /dev/null
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/settings.scala
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.stream.connectors.mqttv5
+
+import java.nio.charset.StandardCharsets
+import java.util.Properties
+
+import scala.collection.immutable
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import org.apache.pekko.japi.Pair
+import org.apache.pekko.util.JavaDurationConverters._
+import org.apache.pekko.util.ccompat.JavaConverters._
+import org.eclipse.paho.mqttv5.client.MqttClientPersistence
+import org.eclipse.paho.mqttv5.client.MqttConnectionOptions
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties
+import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage }
+
+/**
+ * Quality of Service constants as defined in
+ *
[[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901234]]
+ */
+sealed abstract class MqttQoS {
+ def value: Int
+}
+
+/**
+ * Quality of Service constants as defined in
+ *
[[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901234]]
+ */
+object MqttQoS {
+
+ /**
+ * Quality of Service 0 - indicates that a message should be delivered at
most once (zero or one times). The message
+ * will not be persisted to disk, and will not be acknowledged across the
network. This QoS is the fastest, but should
+ * only be used for messages which are not valuable.
+ */
+ object AtMostOnce extends MqttQoS {
+ val value: Int = 0
+ }
+
+ /**
+ * Quality of Service 1 - indicates that a message should be delivered at
least once (one or more times). The message
+ * can only be delivered safely if it can be persisted, so the application
must supply a means of persistence using
+ * [[MqttConnectionSettings]]. If a persistence mechanism is not specified,
the message will not be delivered in the
+ * event of a client failure. The message will be acknowledged across the
network.
+ */
+ object AtLeastOnce extends MqttQoS {
+ val value: Int = 1
+ }
+
+ /**
+ * Quality of Service 2 - indicates that a message should be delivered once.
The message will be persisted to disk,
+ * and will be subject to a two-phase acknowledgement across the network.
The message can only be delivered safely
+ * if it can be persisted, so the application must supply a means of
persistence using [[MqttConnectionSettings]].
+ * If a persistence mechanism is not specified, the message will not be
delivered in the event of a client failure.
+ */
+ object ExactlyOnce extends MqttQoS {
+ val value: Int = 2
+ }
+
+ /**
+ * Java API
+ *
+ * Quality of Service 0 - indicates that a message should be delivered at
most once (zero or one times). The message
+ * will not be persisted to disk, and will not be acknowledged across the
network. This QoS is the fastest, but should
+ * only be used for messages which are not valuable.
+ */
+ def atMostOnce: MqttQoS = AtMostOnce
+
+ /**
+ * Java API
+ *
+ * Quality of Service 1 - indicates that a message should be delivered at
least once (one or more times). The message
+ * can only be delivered safely if it can be persisted, so the application
must supply a means of persistence using
+ * [[MqttConnectionSettings]]. If a persistence mechanism is not specified,
the message will not be delivered in the
+ * event of a client failure. The message will be acknowledged across the
network.
+ */
+ def atLeastOnce: MqttQoS = AtLeastOnce
+
+ /**
+ * Java API
+ *
+ * Quality of Service 2 - indicates that a message should be delivered once.
The message will be persisted to disk,
+ * and will be subject to a two-phase acknowledgement across the network.
The message can only be delivered safely
+ * if it can be persisted, so the application must supply a means of
persistence using [[MqttConnectionSettings]].
+ * If a persistence mechanism is not specified, the message will not be
delivered in the event of a client failure.
+ */
+ def exactlyOnce: MqttQoS = ExactlyOnce
+}
+
+/**
+ * The mapping of topics to subscribe to and the requested Quality of Service
([[MqttQoS]]) per topic.
+ */
+final class MqttSubscriptions private (
+ val subscriptions: Map[String, MqttQoS]) {
+
+ /** Scala API */
+ def withSubscriptions(subscriptions: Map[String, MqttQoS]):
MqttSubscriptions =
+ new MqttSubscriptions(subscriptions)
+
+ /** Java API */
+ def withSubscriptions(subscriptions: java.util.List[pekko.japi.Pair[String,
MqttQoS]]): MqttSubscriptions =
+ new MqttSubscriptions(subscriptions.asScala.map(_.toScala).toMap)
+
+ /** Add this subscription to the map of subscriptions configured already. */
+ def addSubscription(topic: String, qos: MqttQoS): MqttSubscriptions =
+ new MqttSubscriptions(this.subscriptions.updated(topic, qos))
+}
+
+/**
+ * The mapping of topics to subscribe to and the requested Quality of Service
([[MqttQoS]]) per topic.
+ */
+object MqttSubscriptions {
+ val empty = new MqttSubscriptions(Map.empty)
+
+ /** Scala API */
+ def apply(subscriptions: Map[String, MqttQoS]): MqttSubscriptions =
+ new MqttSubscriptions(subscriptions)
+
+ /** Scala API */
+ def apply(topic: String, qos: MqttQoS): MqttSubscriptions =
+ new MqttSubscriptions(Map(topic -> qos))
+
+ /** Scala API */
+ def apply(subscription: (String, MqttQoS)): MqttSubscriptions =
+ new MqttSubscriptions(Map(subscription))
+
+ /** Java API */
+ def create(subscriptions: java.util.List[pekko.japi.Pair[String, MqttQoS]]):
MqttSubscriptions =
+ new MqttSubscriptions(subscriptions.asScala.map(_.toScala).toMap)
+
+ /** Java API */
+ def create(topic: String, qos: MqttQoS): MqttSubscriptions =
+ new MqttSubscriptions(Map(topic -> qos))
+
+}
+
+private[mqttv5] final case class CleanStartSettings(
+ enabled: Boolean,
+ sessionExpiration: Option[FiniteDuration]
+)
+
+private[mqttv5] final case class DisconnectSettings(
+ quiesceTimeout: FiniteDuration,
+ timeout: FiniteDuration,
+ sendDisconnectPacket: Boolean
+)
+
+private[mqttv5] sealed trait AuthSettings {
+ def asString: String
+}
+
+private[mqttv5] object AuthSettings {
+ case object Disabled extends AuthSettings {
+ override lazy val asString: String = "Disabled"
+ }
+
+ final case class Simple(
+ username: String,
+ password: String
+ ) extends AuthSettings {
+ override lazy val asString: String = s"Simple(username=$username)"
+ }
+
+ final case class Enhanced(
+ method: String,
+ initialData: Array[Byte],
+ authPacketHandler: (Int, MqttProperties) => (Int, MqttProperties)
+ ) extends AuthSettings {
+ override lazy val asString: String = s"Enhanced(method=$method)"
+ }
+}
+
+private[mqttv5] final case class MqttOfflinePersistenceSettings(
+ bufferSize: Int = 5000,
+ deleteOldestMessage: Boolean = false,
+ persistBuffer: Boolean = true
+)
+
+/**
+ * Connection settings passed to the underlying Paho client.
+ *
+ * Java docs for `MqttConnectionOptions` are not available;
+ * see [[https://github.com/eclipse-paho/paho.mqtt.java/issues/1012]] or
+ * [[org.eclipse.paho.mqttv5.client.MqttConnectionOptions]] for more info
+ */
+final class MqttConnectionSettings private (
+ val broker: String,
+ val clientId: String,
+ val persistence: MqttClientPersistence,
+ val cleanStart: CleanStartSettings,
+ val disconnect: DisconnectSettings,
+ val auth: AuthSettings,
+ val offlinePersistence: Option[MqttOfflinePersistenceSettings],
+ val automaticReconnect: Boolean,
+ val keepAliveInterval: FiniteDuration,
+ val connectionTimeout: FiniteDuration,
+ val serverUris: Array[String],
+ val will: Option[MqttMessage],
+ val sslProperties: Map[String, String],
+ val socketFactory: Option[javax.net.ssl.SSLSocketFactory],
+ val sslHostnameVerifier: Option[javax.net.ssl.HostnameVerifier]
+) {
+ def asMqttConnectionOptions(): MqttConnectionOptions = {
+ val options = new MqttConnectionOptions
+
+ auth match {
+ case AuthSettings.Disabled =>
+ // do nothing
+
+ case AuthSettings.Simple(username, password) =>
+ options.setUserName(username)
+ options.setPassword(password.getBytes(StandardCharsets.UTF_8))
+
+ case AuthSettings.Enhanced(method, initialData, _) =>
+ options.setAuthMethod(method)
+ options.setAuthData(initialData)
+ }
+
+ options.setCleanStart(cleanStart.enabled)
+ if (!cleanStart.enabled) {
+ cleanStart.sessionExpiration match {
+ case Some(value) =>
+ options.setSessionExpiryInterval(value.toSeconds)
+
+ case None =>
+ // The documentation of `setSessionExpiryInterval` is not correct in
this case -
+ // the server will treat a `null` as `0` and will immediately expire
the session.
+ //
+ // Instead, we set the expiration to the maximum allowed value (~136
years of retention;
+ // see
`org.eclipse.paho.mqttv5.common.packet.MqttProperties#setSessionExpiryInterval`)
+ options.setSessionExpiryInterval(Int.MaxValue.toLong * 2)
+ }
+ }
+
+ if (serverUris.nonEmpty) {
+ options.setServerURIs(serverUris)
+ }
+
+ will.foreach { w =>
+ options.setWill(
+ w.topic,
+ new PahoMqttMessage(
+ w.payload.toArray,
+ w.qos.getOrElse(MqttQoS.atLeastOnce).value,
+ w.retained,
+ /* properties */ null
+ )
+ )
+ }
+
+ if (sslProperties.nonEmpty) {
+ val properties = new Properties()
+ sslProperties.foreach { case (key, value) => properties.setProperty(key,
value) }
+ options.setSSLProperties(properties)
+ }
+
+ options.setAutomaticReconnect(automaticReconnect)
+ options.setKeepAliveInterval(keepAliveInterval.toSeconds.toInt)
+ options.setConnectionTimeout(connectionTimeout.toSeconds.toInt)
+ socketFactory.foreach(options.setSocketFactory)
+ sslHostnameVerifier.foreach(options.setSSLHostnameVerifier)
+
+ options
+ }
+
+ def withBroker(value: String): MqttConnectionSettings =
+ copy(broker = value)
+
+ def withClientId(value: String): MqttConnectionSettings =
+ copy(clientId = value)
+
+ def withPersistence(value: MqttClientPersistence): MqttConnectionSettings =
+ copy(persistence = value)
+
+ def withAuth(
+ username: String,
+ password: String
+ ): MqttConnectionSettings =
+ copy(auth = AuthSettings.Simple(username = username, password = password))
+
+ /** Scala API */
+ def withAuth(
+ method: String,
+ initialData: Array[Byte],
+ authPacketHandler: (Int, MqttProperties) => (Int, MqttProperties)
+ ): MqttConnectionSettings =
+ copy(
+ auth = AuthSettings.Enhanced(
+ method = method,
+ initialData = initialData,
+ authPacketHandler = authPacketHandler
+ )
+ )
+
+ /** Java API */
+ def withAuth(
+ method: String,
+ initialData: Array[Byte],
+ authPacketHandler: java.util.function.BiFunction[Int, MqttProperties,
Pair[Int, MqttProperties]]
+ ): MqttConnectionSettings =
+ withAuth(
+ method = method,
+ initialData = initialData,
+ authPacketHandler = (reasonCode: Int, properties: MqttProperties) =>
+ authPacketHandler.apply(reasonCode, properties).toScala
+ )
+
+ def withSocketFactory(value: javax.net.ssl.SSLSocketFactory):
MqttConnectionSettings =
+ copy(socketFactory = Option(value))
+
+ def withCleanStart(enabled: Boolean): MqttConnectionSettings =
+ copy(cleanStart = CleanStartSettings(enabled = enabled, sessionExpiration
= None))
+
+ /** Scala API */
+ def withCleanStart(enabled: Boolean, sessionExpiration:
Option[FiniteDuration]): MqttConnectionSettings =
+ copy(cleanStart = CleanStartSettings(enabled, sessionExpiration))
+
+ /** Java API */
+ def withCleanStart(enabled: Boolean, sessionExpiration: java.time.Duration):
MqttConnectionSettings =
+ copy(cleanStart = CleanStartSettings(enabled,
Option(sessionExpiration).map(_.asScala)))
+
+ def withWill(value: MqttMessage): MqttConnectionSettings =
+ copy(will = Option(value))
+
+ def withAutomaticReconnect(value: Boolean): MqttConnectionSettings =
+ copy(automaticReconnect = value)
+
+ /** Scala API */
+ def withKeepAliveInterval(value: FiniteDuration): MqttConnectionSettings =
+ copy(keepAliveInterval = value)
+
+ /** Java API */
+ def withKeepAliveInterval(value: java.time.Duration): MqttConnectionSettings
=
+ withKeepAliveInterval(value.asScala)
+
+ /** Scala API */
+ def withConnectionTimeout(value: FiniteDuration): MqttConnectionSettings =
+ copy(connectionTimeout = value)
+
+ /** Java API */
+ def withConnectionTimeout(value: java.time.Duration): MqttConnectionSettings
=
+ withConnectionTimeout(value.asScala)
+
+ /** Scala API */
+ def withDisconnectQuiesceTimeout(value: FiniteDuration):
MqttConnectionSettings =
+ copy(disconnect = disconnect.copy(quiesceTimeout = value))
+
+ /** Java API */
+ def withDisconnectQuiesceTimeout(value: java.time.Duration):
MqttConnectionSettings =
+ withDisconnectQuiesceTimeout(value.asScala)
+
+ /** Scala API */
+ def withDisconnectTimeout(value: FiniteDuration): MqttConnectionSettings =
+ copy(disconnect = disconnect.copy(timeout = value))
+
+ /** Java API */
+ def withDisconnectTimeout(value: java.time.Duration): MqttConnectionSettings
=
+ withDisconnectTimeout(value.asScala)
+
+ def withServerUri(value: String): MqttConnectionSettings =
+ copy(serverUris = Array(value))
+
+ /** Scala API */
+ def withServerUris(value: immutable.Seq[String]): MqttConnectionSettings =
+ copy(serverUris = value.toArray)
+
+ /** Java API */
+ def withServerUris(value: java.util.List[String]): MqttConnectionSettings =
+ copy(serverUris = value.asScala.toArray)
+
+ def withSslHostnameVerifier(value: javax.net.ssl.HostnameVerifier):
MqttConnectionSettings =
+ copy(sslHostnameVerifier = Option(value))
+
+ /** Scala API */
+ def withSslProperties(value: Map[String, String]): MqttConnectionSettings =
+ copy(sslProperties = value)
+
+ /** Java API */
+ def withSslProperties(value: java.util.Map[String, String]):
MqttConnectionSettings =
+ withSslProperties(value = value.asScala.toMap)
+
+ def withOfflinePersistenceSettings(
+ bufferSize: Int = 5000,
+ deleteOldestMessage: Boolean = false,
+ persistBuffer: Boolean = true
+ ): MqttConnectionSettings =
+ copy(
+ offlinePersistence = Option(
+ MqttOfflinePersistenceSettings(
+ bufferSize = bufferSize,
+ deleteOldestMessage = deleteOldestMessage,
+ persistBuffer = persistBuffer
+ )
+ )
+ )
+
+ private def copy(
+ broker: String = broker,
+ clientId: String = clientId,
+ persistence: MqttClientPersistence = persistence,
+ cleanStart: CleanStartSettings = cleanStart,
+ disconnect: DisconnectSettings = disconnect,
+ offlinePersistence: Option[MqttOfflinePersistenceSettings] =
offlinePersistence,
+ auth: AuthSettings = auth,
+ automaticReconnect: Boolean = automaticReconnect,
+ keepAliveInterval: FiniteDuration = keepAliveInterval,
+ connectionTimeout: FiniteDuration = connectionTimeout,
+ serverUris: Array[String] = serverUris,
+ will: Option[MqttMessage] = will,
+ sslProperties: Map[String, String] = sslProperties,
+ socketFactory: Option[javax.net.ssl.SSLSocketFactory] = socketFactory,
+ sslHostnameVerifier: Option[javax.net.ssl.HostnameVerifier] =
sslHostnameVerifier
+ ): MqttConnectionSettings =
+ new MqttConnectionSettings(
+ broker = broker,
+ clientId = clientId,
+ persistence = persistence,
+ disconnect = disconnect,
+ offlinePersistence = offlinePersistence,
+ auth = auth,
+ cleanStart = cleanStart,
+ automaticReconnect = automaticReconnect,
+ keepAliveInterval = keepAliveInterval,
+ connectionTimeout = connectionTimeout,
+ serverUris = serverUris,
+ will = will,
+ sslProperties = sslProperties,
+ socketFactory = socketFactory,
+ sslHostnameVerifier = sslHostnameVerifier
+ )
+
+ override def toString: String =
+ "MqttConnectionSettings(" +
+ s"broker=$broker," +
+ s"clientId=$clientId," +
+ s"persistence=$persistence," +
+ s"disconnect=$disconnect," +
+ s"offlinePersistence=$offlinePersistence," +
+ s"auth=${auth.asString}," +
+ s"cleanStart=$cleanStart," +
+ s"automaticReconnect=$automaticReconnect," +
+ s"keepAliveInterval=$keepAliveInterval," +
+ s"connectionTimeout=$connectionTimeout," +
+ s"serverUris=$serverUris," +
+ s"will=$will," +
+ s"sslProperties=$sslProperties," +
+ s"socketFactory=$socketFactory," +
+ s"sslHostnameVerifier=$sslHostnameVerifier" +
+ ")"
+}
+
+/**
+ * Factory for connection settings passed to the underlying Paho client.
+ *
+ * Java docs for `MqttConnectionOptions` are not available;
+ * see [[https://github.com/eclipse-paho/paho.mqtt.java/issues/1012]] or
+ * [[org.eclipse.paho.mqttv5.client.MqttConnectionOptions]] for more info
+ */
+object MqttConnectionSettings {
+
+ /** Scala API */
+ def apply(
+ broker: String,
+ clientId: String,
+ persistence: MqttClientPersistence
+ ): MqttConnectionSettings = {
+ val defaults = new MqttConnectionOptions
+
+ new MqttConnectionSettings(
+ broker = broker,
+ clientId = clientId,
+ persistence = persistence,
+ disconnect = DisconnectSettings(
+ quiesceTimeout = 30.seconds,
+ timeout = 10.seconds,
+ sendDisconnectPacket = true
+ ),
+ offlinePersistence = None,
+ auth = AuthSettings.Disabled,
+ cleanStart = CleanStartSettings(enabled = defaults.isCleanStart,
sessionExpiration = None),
+ automaticReconnect = false,
+ keepAliveInterval = defaults.getKeepAliveInterval.seconds,
+ connectionTimeout = defaults.getConnectionTimeout.seconds,
+ serverUris = Array.empty,
+ will = None,
+ sslProperties = Map.empty,
+ socketFactory = None,
+ sslHostnameVerifier = None
+ )
+ }
+
+ /** Java API */
+ def create(
+ broker: String,
+ clientId: String,
+ persistence: MqttClientPersistence
+ ): MqttConnectionSettings = apply(
+ broker = broker,
+ clientId = clientId,
+ persistence = persistence
+ )
+}
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttFlowTest.java
b/mqttv5/src/test/java/docs/javadsl/MqttFlowTest.java
new file mode 100644
index 000000000..99c770b8e
--- /dev/null
+++ b/mqttv5/src/test/java/docs/javadsl/MqttFlowTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.javadsl;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings;
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage;
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS;
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttFlow;
+import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAck;
+import
org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAckImpl;
+import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
+import org.apache.pekko.stream.javadsl.Flow;
+import org.apache.pekko.stream.javadsl.Keep;
+import org.apache.pekko.stream.javadsl.Sink;
+import org.apache.pekko.stream.javadsl.Source;
+import org.apache.pekko.testkit.javadsl.TestKit;
+import org.apache.pekko.util.ByteString;
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.junit.Assert.assertFalse;
+
+public class MqttFlowTest {
+
+ @Rule public final LogCapturingJunit4 logCapturing = new
LogCapturingJunit4();
+
+ private static final Logger log =
LoggerFactory.getLogger(MqttFlowTest.class);
+
+ private static ActorSystem system;
+
+ private static final int bufferSize = 8;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ system = ActorSystem.create("MqttFlowTest");
+ }
+
+ @AfterClass
+ public static void teardown() {
+ TestKit.shutdownActorSystem(system);
+ }
+
+ @Test
+ public void establishBidirectionalConnectionAndSubscribeToATopic() throws
Exception {
+ final MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", "test-java-client", new
MemoryPersistence());
+
+ // #create-flow
+ final Flow<MqttMessage, MqttMessage, CompletionStage<Done>> mqttFlow =
+ MqttFlow.atMostOnce(
+ connectionSettings,
+ MqttSubscriptions.create("v5/flow-test/topic",
MqttQoS.atMostOnce()),
+ bufferSize,
+ MqttQoS.atLeastOnce());
+ // #create-flow
+
+ final Source<MqttMessage, CompletableFuture<Optional<MqttMessage>>> source
= Source.maybe();
+
+ // #run-flow
+ final Pair<
+ Pair<CompletableFuture<Optional<MqttMessage>>,
CompletionStage<Done>>,
+ CompletionStage<List<MqttMessage>>>
+ materialized =
+ source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(),
Keep.both()).run(system);
+
+ CompletableFuture<Optional<MqttMessage>> mqttMessagePromise =
materialized.first().first();
+ CompletionStage<Done> subscribedToMqtt = materialized.first().second();
+ CompletionStage<List<MqttMessage>> streamResult = materialized.second();
+ // #run-flow
+
+ subscribedToMqtt.thenAccept(
+ a -> {
+ mqttMessagePromise.complete(Optional.empty());
+
assertFalse(streamResult.toCompletableFuture().isCompletedExceptionally());
+ });
+ }
+
+ @Test
+ public void sendAnAckAfterMessageSent() throws Exception {
+ MqttMessageWithAck testMessage = new MqttMessageWithAckFake();
+
+ final Source<MqttMessageWithAck, NotUsed> source =
Source.single(testMessage);
+
+ final MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", "test-java-client-ack", new
MemoryPersistence());
+ // #create-flow-ack
+ final Flow<MqttMessageWithAck, MqttMessageWithAck, CompletionStage<Done>>
mqttFlow =
+ MqttFlow.atLeastOnceWithAck(
+ connectionSettings,
+ MqttSubscriptions.create("v5/flow-test/topic-ack",
MqttQoS.atMostOnce()),
+ bufferSize,
+ MqttQoS.atLeastOnce());
+ // #create-flow-ack
+
+ // #run-flow-ack
+ final Pair<Pair<NotUsed, CompletionStage<Done>>,
CompletionStage<List<MqttMessageWithAck>>>
+ materialized =
+ source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(),
Keep.both()).run(system);
+
+ // #run-flow-ack
+
+ for (int i = 0; (i < 10 && !((MqttMessageWithAckFake) testMessage).acked);
i++) {
+ Thread.sleep(1000);
+ }
+
+ assert ((MqttMessageWithAckFake) testMessage).acked;
+ }
+
+ class MqttMessageWithAckFake extends MqttMessageWithAckImpl {
+ Boolean acked;
+
+ MqttMessageWithAckFake() {
+ acked = false;
+ }
+
+ @Override
+ public CompletionStage<Done> ack() {
+ acked = true;
+ System.out.println("[MqttMessageWithAckImpl]");
+ return CompletableFuture.completedFuture(Done.getInstance());
+ }
+
+ @Override
+ public MqttMessage message() {
+ return MqttMessage.create("topic", ByteString.fromString("hi!"));
+ }
+ }
+}
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
new file mode 100644
index 000000000..3d769ba31
--- /dev/null
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.javadsl;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.stream.KillSwitches;
+import org.apache.pekko.stream.UniqueKillSwitch;
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings;
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage;
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS;
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAck;
+import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSink;
+import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSource;
+import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
+import org.apache.pekko.stream.javadsl.*;
+import org.apache.pekko.stream.testkit.TestSubscriber;
+import org.apache.pekko.stream.testkit.javadsl.TestSink;
+import org.apache.pekko.testkit.javadsl.TestKit;
+import org.apache.pekko.util.ByteString;
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class MqttSourceTest {
+
+ @Rule
+ public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();
+
+ private static final Logger log =
LoggerFactory.getLogger(MqttSourceTest.class);
+
+ private static ActorSystem system;
+
+ private static final int bufferSize = 8;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ system = ActorSystem.create("MqttSourceTest");
+ }
+
+ @AfterClass
+ public static void teardown() {
+ TestKit.shutdownActorSystem(system);
+ }
+
+ @Test
+ public void connectionSettings() {
+ // #create-connection-settings
+ MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", // (1)
+ "test-java-client", // (2)
+ new MemoryPersistence() // (3)
+ );
+ // #create-connection-settings
+ assertThat(connectionSettings.toString(),
containsString("tcp://localhost:1883"));
+ }
+
+ @Test
+ public void connectionSettingsForSsl() throws Exception {
+ // #ssl-settings
+ MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create("ssl://localhost:1885",
"ssl-client", new MemoryPersistence())
+ .withAuth("mqttUser", "mqttPassword")
+
.withSocketFactory(SSLContext.getDefault().getSocketFactory());
+ // #ssl-settings
+ assertThat(connectionSettings.toString(),
containsString("ssl://localhost:1885"));
+ assertThat(connectionSettings.asMqttConnectionOptions().getUserName(),
is("mqttUser"));
+ }
+
+ @Test
+ public void publishAndConsumeWithoutAutoAck() throws Exception {
+ final String topic = "v5/source-test/manualacks";
+ final MqttConnectionSettings baseConnectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", "test-java-client", new
MemoryPersistence());
+
+ MqttConnectionSettings connectionSettings = baseConnectionSettings;
+
+ final List<String> input = Arrays.asList("one", "two", "three",
"four", "five");
+
+ // #create-source-with-manualacks
+ Source<MqttMessageWithAck, CompletionStage<Done>> mqttSource =
+ MqttSource.atLeastOnce(
+ connectionSettings
+
.withClientId("source-test/source-withoutAutoAck")
+ .withCleanStart(false),
+ MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
+ bufferSize);
+ // #create-source-with-manualacks
+
+ final Pair<CompletionStage<Done>,
CompletionStage<List<MqttMessageWithAck>>> unackedResult =
+ mqttSource.take(input.size()).toMat(Sink.seq(),
Keep.both()).run(system);
+
+ unackedResult.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ final Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+ MqttSink.create(
+
baseConnectionSettings.withClientId("source-test/sink-withoutAutoAck"),
+ MqttQoS.atLeastOnce());
+ Source.from(input)
+ .map(s -> MqttMessage.create(topic, ByteString.fromString(s)))
+ .runWith(mqttSink, system);
+
+ assertEquals(
+ input,
+ unackedResult.second().toCompletableFuture().get(5,
TimeUnit.SECONDS).stream()
+ .map(m -> m.message().payload().utf8String())
+ .collect(Collectors.toList()));
+
+ Flow<MqttMessageWithAck, MqttMessageWithAck, NotUsed> businessLogic =
Flow.create();
+
+ // #run-source-with-manualacks
+ final CompletionStage<List<MqttMessage>> result =
+ mqttSource
+ .via(businessLogic)
+ .mapAsync(
+ 1,
+ messageWithAck ->
+ messageWithAck.ack().thenApply(unused2
-> messageWithAck.message()))
+ .take(input.size())
+ .runWith(Sink.seq(), system);
+ // #run-source-with-manualacks
+
+ assertEquals(
+ input,
+ result.toCompletableFuture().get(3, TimeUnit.SECONDS).stream()
+ .map(m -> m.payload().utf8String())
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ public void keepConnectionOpenIfDownstreamClosesAndThereArePendingAcks()
throws Exception {
+ final String topic = "v5/source-test/pendingacks";
+ final MqttConnectionSettings baseConnectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", "test-java-client", new
MemoryPersistence());
+
+ MqttConnectionSettings sourceSettings =
+
baseConnectionSettings.withClientId("source-test/source-pending");
+ MqttConnectionSettings sinkSettings =
+
baseConnectionSettings.withClientId("source-test/sink-pending");
+
+ final Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+ MqttSink.create(sinkSettings, MqttQoS.atLeastOnce());
+ final List<String> input = Arrays.asList("one", "two", "three",
"four", "five");
+
+ MqttConnectionSettings connectionSettings =
sourceSettings.withCleanStart(false);
+ MqttSubscriptions subscriptions = MqttSubscriptions.create(topic,
MqttQoS.atLeastOnce());
+ final Source<MqttMessageWithAck, CompletionStage<Done>> mqttSource =
+ MqttSource.atLeastOnce(connectionSettings, subscriptions,
bufferSize);
+
+ final Pair<CompletionStage<Done>,
CompletionStage<List<MqttMessageWithAck>>> unackedResult =
+ mqttSource.take(input.size()).toMat(Sink.seq(),
Keep.both()).run(system);
+
+ unackedResult.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ Source.from(input)
+ .map(s -> MqttMessage.create(topic, ByteString.fromString(s)))
+ .runWith(mqttSink, system)
+ .toCompletableFuture()
+ .get(3, TimeUnit.SECONDS);
+
+ unackedResult
+ .second()
+ .toCompletableFuture()
+ .get(5, TimeUnit.SECONDS)
+ .forEach(
+ m -> {
+ try {
+ m.ack().toCompletableFuture().get(3,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ assertFalse("Error acking message manually",
false);
+ }
+ });
+ }
+
+ @Test
+ public void receiveFromMultipleTopics() throws Exception {
+ final String topic1 = "v5/source-test/topic1";
+ final String topic2 = "v5/source-test/topic2";
+
+ MqttConnectionSettings connectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", "test-java-client", new
MemoryPersistence());
+
+ final Integer messageCount = 7;
+
+ // #create-source
+ MqttSubscriptions subscriptions =
+ MqttSubscriptions.create(topic1, MqttQoS.atMostOnce())
+ .addSubscription(topic2, MqttQoS.atMostOnce());
+
+ Source<MqttMessage, CompletionStage<Done>> mqttSource =
+ MqttSource.atMostOnce(
+ connectionSettings.withClientId("source-test/source"),
subscriptions, bufferSize);
+
+ Pair<CompletionStage<Done>, CompletionStage<List<String>>>
materialized =
+ mqttSource
+ .map(m -> m.topic() + "-" + m.payload().utf8String())
+ .take(messageCount * 2)
+ .toMat(Sink.seq(), Keep.both())
+ .run(system);
+
+ CompletionStage<Done> subscribed = materialized.first();
+ CompletionStage<List<String>> streamResult = materialized.second();
+ // #create-source
+
+ subscribed.toCompletableFuture().get(3, TimeUnit.SECONDS);
+
+ List<MqttMessage> messages =
+ IntStream.range(0, messageCount)
+ .boxed()
+ .flatMap(
+ i ->
+ Stream.of(
+ MqttMessage.create(topic1,
ByteString.fromString("msg" + i.toString())),
+ MqttMessage.create(topic2,
ByteString.fromString("msg" + i.toString()))))
+ .collect(Collectors.toList());
+
+ // #run-sink
+ Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+
MqttSink.create(connectionSettings.withClientId("source-test/sink"),
MqttQoS.atLeastOnce());
+ Source.from(messages).runWith(mqttSink, system);
+ // #run-sink
+
+ assertEquals(
+ IntStream.range(0, messageCount)
+ .boxed()
+ .flatMap(i -> Stream.of("v5/source-test/topic1-msg" +
i, "v5/source-test/topic2-msg" + i))
+ .collect(Collectors.toSet()),
+ new HashSet<>(streamResult.toCompletableFuture().get(3,
TimeUnit.SECONDS)));
+ }
+
+ @Test
+ public void supportWillMessage() throws Exception {
+ String topic1 = "v5/source-test/topic1";
+ String willTopic = "v5/source-test/will";
+ final MqttConnectionSettings baseConnectionSettings =
+ MqttConnectionSettings.create(
+ "tcp://localhost:1883", "test-java-client", new
MemoryPersistence());
+ MqttConnectionSettings sourceSettings =
+
baseConnectionSettings.withClientId("source-test/source-withoutAutoAck");
+ MqttConnectionSettings sinkSettings =
+
baseConnectionSettings.withClientId("source-test/sink-withoutAutoAck");
+
+ MqttMessage msg = MqttMessage.create(topic1,
ByteString.fromString("ohi"));
+
+ // #will-message
+ MqttMessage lastWill =
+ MqttMessage.create(willTopic, ByteString.fromString("ohi"))
+ .withQos(MqttQoS.atLeastOnce())
+ .withRetained(true);
+ // #will-message
+
+ // Create a proxy to RabbitMQ so it can be shutdown
+ int proxyPort = 1347; // make sure to keep it separate from ports used
by other tests
+ Pair<CompletionStage<Tcp.ServerBinding>,
CompletionStage<Tcp.IncomingConnection>> result1 =
+ Tcp.get(system).bind("localhost",
proxyPort).toMat(Sink.head(), Keep.both()).run(system);
+
+ CompletionStage<UniqueKillSwitch> proxyKs =
+ result1
+ .second()
+ .toCompletableFuture()
+ .thenApply(
+ conn ->
+ conn.handleWith(
+ Tcp.get(system)
+
.outgoingConnection("localhost", 1883)
+
.viaMat(KillSwitches.single(), Keep.right()),
+ system));
+
+ result1.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ MqttConnectionSettings settings1 =
+ sourceSettings
+ .withClientId("source-test/testator")
+ .withBroker("tcp://localhost:" + proxyPort)
+ .withWill(lastWill);
+ MqttSubscriptions subscriptions = MqttSubscriptions.create(topic1,
MqttQoS.atLeastOnce());
+
+ Source<MqttMessage, CompletionStage<Done>> source1 =
+ MqttSource.atMostOnce(settings1, subscriptions, bufferSize);
+
+ Pair<CompletionStage<Done>, TestSubscriber.Probe<MqttMessage>> result2
=
+ source1.toMat(TestSink.probe(system), Keep.both()).run(system);
+
+ // Ensure that the connection made it all the way to the server by
waiting until it receives a
+ // message
+ result2.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+ Source.single(msg).runWith(MqttSink.create(sinkSettings,
MqttQoS.atLeastOnce()), system);
+ result2.second().requestNext();
+
+ // Kill the proxy, producing an unexpected disconnection of the client
+ proxyKs.toCompletableFuture().get(5, TimeUnit.SECONDS).shutdown();
+
+ MqttConnectionSettings settings2 =
sourceSettings.withClientId("source-test/executor");
+ MqttSubscriptions subscriptions2 = MqttSubscriptions.create(willTopic,
MqttQoS.atLeastOnce());
+ Source<MqttMessage, CompletionStage<Done>> source2 =
+ MqttSource.atMostOnce(settings2, subscriptions2, bufferSize);
+
+ CompletionStage<MqttMessage> elem = source2.runWith(Sink.head(),
system);
+ assertEquals(
+ MqttMessage.create(willTopic, ByteString.fromString("ohi")),
+ elem.toCompletableFuture().get(3, TimeUnit.SECONDS));
+ }
+}
diff --git a/mqttv5/src/test/resources/application.conf
b/mqttv5/src/test/resources/application.conf
new file mode 100644
index 000000000..a1d8c93dd
--- /dev/null
+++ b/mqttv5/src/test/resources/application.conf
@@ -0,0 +1,7 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
+ loglevel = "DEBUG"
+}
diff --git a/mqttv5/src/test/resources/logback-test.xml
b/mqttv5/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..68376b99a
--- /dev/null
+++ b/mqttv5/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<configuration>
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>target/mqtt.log</file>
+ <append>false</append>
+ <encoder>
+ <pattern>%d{ISO8601} %-5level [%thread] [%logger{36}]
%msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %-5level [%-20.20thread]
%-36.36logger{36} %msg%n%rEx</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="CapturingAppender"
class="org.apache.pekko.stream.connectors.testkit.CapturingAppender"/>
+
+ <logger
name="org.apache.pekko.stream.connectors.testkit.CapturingAppenderDelegate">
+ <appender-ref ref="STDOUT"/>
+ </logger>
+
+ <logger name="org.apache.pekko" level="DEBUG"/>
+ <logger name="org.apache.pekko.stream.connectors" level="debug"/>
+
+ <root level="debug">
+ <appender-ref ref="CapturingAppender"/>
+ <appender-ref ref="FILE" />
+ </root>
+</configuration>
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
b/mqttv5/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
new file mode 100644
index 000000000..fde27f43b
--- /dev/null
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttFlowSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.scaladsl
+
+import scala.concurrent.Future
+import scala.concurrent.Promise
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttFlow
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
+import org.apache.pekko.stream.scaladsl.Flow
+import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.Sink
+import org.apache.pekko.stream.scaladsl.Source
+import org.apache.pekko.util.ByteString
+
+class MqttFlowSpec extends MqttSpecBase("MqttFlowSpec") {
+
+ "mqtt flow" should {
+ "establish a bidirectional connection and subscribe to a topic" in {
+ val topic = "v5/flow-spec/topic"
+ // #create-flow
+ val mqttFlow: Flow[MqttMessage, MqttMessage, Future[Done]] =
+ MqttFlow.atMostOnce(
+ connectionSettings.withClientId("flow-spec/flow"),
+ MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
+ bufferSize = 8,
+ MqttQoS.AtLeastOnce)
+ // #create-flow
+
+ val source = Source.maybe[MqttMessage]
+
+ // #run-flow
+ val ((mqttMessagePromise, subscribed), result) = source
+ .viaMat(mqttFlow)(Keep.both)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+ // #run-flow
+
+ subscribed.futureValue shouldBe Done
+ mqttMessagePromise.success(None)
+ noException should be thrownBy result.futureValue
+ }
+
+ "send an ack after sent confirmation" in {
+ val topic = "v5/flow-spec/topic-ack"
+
+ // #create-flow-ack
+ val mqttFlow: Flow[MqttMessageWithAck, MqttMessageWithAck, Future[Done]]
=
+ MqttFlow.atLeastOnceWithAck(
+ connectionSettings,
+ MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
+ bufferSize = 8,
+ MqttQoS.AtLeastOnce)
+ // #create-flow-ack
+
+ val acked = Promise[Done]()
+
+ class MqttMessageWithAckFake extends MqttMessageWithAck {
+ override val message: MqttMessage = MqttMessage.create(topic,
ByteString.fromString("ohi"))
+
+ override def ack(): Future[Done] = {
+ acked.trySuccess(Done)
+ Future.successful(Done)
+ }
+ }
+
+ val message = new MqttMessageWithAckFake
+
+ val source = Source.single(message)
+
+ // #run-flow-ack
+ val (subscribed, result) = source
+ .viaMat(mqttFlow)(Keep.right)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ // #run-flow-ack
+ subscribed.futureValue shouldBe Done
+ result.futureValue shouldBe empty
+
+ acked.future.futureValue shouldBe Done
+ }
+ }
+}
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSinkSpec.scala
b/mqttv5/src/test/scala/docs/scaladsl/MqttSinkSpec.scala
new file mode 100644
index 000000000..03b287eeb
--- /dev/null
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSinkSpec.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.scaladsl
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.connectors.mqttv5
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSink
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSource
+import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.Sink
+import org.apache.pekko.stream.scaladsl.Source
+import org.apache.pekko.util.ByteString
+import org.eclipse.paho.mqttv5.common.MqttException
+
+class MqttSinkSpec extends MqttSpecBase("MqttSinkSpec") {
+ "mqtt sink" should {
+ "send one message to a topic" in {
+ val topic = "v5/sink-spec/topic1"
+
+ val msg = MqttMessage(topic, ByteString("ohi"))
+
+ val (subscribed, message) = MqttSource
+ .atMostOnce(connectionSettings.withClientId("sink-spec/source1"),
+ MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
+ 8)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+
Source.single(msg).runWith(MqttSink(connectionSettings.withClientId("sink-spec/sink1"),
+ MqttQoS.atLeastOnce))
+
+ message.futureValue shouldBe msg
+ }
+
+ "send multiple messages to a topic" in {
+ val topic = "v5/sink-spec/topic2"
+
+ val msg = MqttMessage(topic, ByteString("ohi"))
+ val numOfMessages = 5
+
+ val (subscribed, messagesFuture) =
+ MqttSource
+ .atMostOnce(connectionSettings.withClientId("sink-spec/source2"),
+ MqttSubscriptions(topic, MqttQoS.atLeastOnce),
+ 8)
+ .take(numOfMessages)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+ Source(1 to numOfMessages).map(_ => msg).runWith(
+ MqttSink(connectionSettings.withClientId("sink-spec/sink2"),
MqttQoS.atLeastOnce))
+
+ val messages = messagesFuture.futureValue
+ (messages should have).length(numOfMessages)
+ messages.foreach { _ shouldBe msg }
+ }
+
+ "connection should fail to wrong broker" in {
+ val secureTopic = "v5/sink-spec/secure-topic1"
+
+ val wrongConnectionSettings =
+
connectionSettings.withClientId("sink-spec/sink3").withBroker("tcp://localhost:1884")
+ val msg = MqttMessage(secureTopic, ByteString("ohi"))
+
+ val termination = Source
+ .single(msg)
+ .runWith(MqttSink(wrongConnectionSettings, MqttQoS.atLeastOnce))
+
+ termination.failed.futureValue shouldBe an[MqttException]
+ }
+
+ "fail to publish when credentials are not provided" in {
+ val secureTopic = "v5/sink-spec/secure-topic2"
+ val msg = MqttMessage(secureTopic, ByteString("ohi"))
+
+ val termination =
+
Source.single(msg).runWith(MqttSink(connectionSettings.withClientId("sink-spec/sink4").withAuth(
+ "username1", "bad_password"), MqttQoS.atLeastOnce))
+
+ whenReady(termination.failed) { ex =>
+ ex shouldBe an[MqttException]
+ ex.getMessage should include("Not authorized")
+ }
+ }
+
+ "publish when credentials are provided" in {
+ val secureTopic = "v5/sink-spec/secure-topic3"
+ val msg = MqttMessage(secureTopic, ByteString("ohi"))
+
+ val (subscribed, message) = MqttSource
+
.atMostOnce(connectionSettings.withClientId("sink-spec/source1").withAuth("username1",
+ "password1"),
+ MqttSubscriptions(secureTopic, MqttQoS.AtLeastOnce),
+ 8)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+
+ val termination = Source
+ .single(msg)
+
.runWith(MqttSink(connectionSettings.withClientId("sink-spec/sink5").withAuth("username1",
+ "password1"), MqttQoS.atLeastOnce))
+
+ termination.futureValue shouldBe Done
+
+ message.futureValue shouldBe msg
+ }
+
+ "received retained message on new client" in {
+ val topic = "v5/sink-spec/topic3"
+ val msg = MqttMessage(topic,
ByteString("ohi")).withQos(MqttQoS.atLeastOnce).withRetained(true)
+
+ val messageSent = Source.single(msg).runWith(
+ MqttSink(connectionSettings.withClientId("sink-spec/sink6"),
MqttQoS.atLeastOnce))
+
+ Await.ready(messageSent, 3.seconds)
+
+ val messageFuture =
+ MqttSource
+ .atMostOnce(connectionSettings.withClientId("source-spec/retained"),
+ mqttv5.MqttSubscriptions(topic, MqttQoS.atLeastOnce),
+ 8)
+ .runWith(Sink.head)
+
+ val message = messageFuture.futureValue
+ message.topic shouldBe msg.topic
+ message.payload shouldBe msg.payload
+ }
+
+ "fail to publish to an unauthorized topic" in {
+ val clientId = "sink-spec/sink7"
+ val topic = "v5/sink-spec/unauthorized"
+
+ val msg = MqttMessage(topic, ByteString("ohi"))
+ val numOfMessages = 5
+
+ val termination = Source(1 to numOfMessages).map(_ => msg).runWith(
+ MqttSink(connectionSettings.withClientId(clientId),
MqttQoS.atLeastOnce))
+
+ whenReady(termination.failed) { ex =>
+ ex shouldBe an[RuntimeException]
+ ex.getMessage should be(
+ s"Client [$clientId] received one or more error codes while
publishing on topic [$topic] " +
+ s"to broker [${connectionSettings.broker}]: [code=135]")
+ }
+ }
+ }
+}
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
new file mode 100644
index 000000000..5ae7b1b3a
--- /dev/null
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.scaladsl
+
+import javax.net.ssl.SSLContext
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.apache.pekko.Done
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream._
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
+import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSink
+import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSource
+import org.apache.pekko.stream.scaladsl._
+import org.apache.pekko.stream.testkit.scaladsl.TestSink
+import org.apache.pekko.util.ByteString
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence
+import org.eclipse.paho.mqttv5.common.MqttException
+import org.slf4j.LoggerFactory
+
+class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") {
+
+ private val log = LoggerFactory.getLogger(classOf[MqttSourceSpec])
+
+ private val topic1 = "v5/source-spec/topic1"
+
+ private val sourceSettings =
connectionSettings.withClientId("source-spec/source")
+ private val sinkSettings =
connectionSettings.withClientId("source-spec/sink")
+
+ /**
+ * Wrap a source with restart logic and exposes an equivalent materialized
value.
+ * Could be simplified when https://github.com/akka/akka/issues/24771 is
solved.
+ */
+ def wrapWithRestart[M](
+ source: => Source[M, Future[Done]])(implicit ec: ExecutionContext):
Source[M, Future[Done]] = {
+ val subscribed = Promise[Done]()
+ RestartSource
+ .withBackoff(
+ RestartSettings(minBackoff = 100.millis, maxBackoff = 3.seconds,
randomFactor = 0.2).withMaxRestarts(5,
+ 1.second)) { () =>
+ source
+ .mapMaterializedValue { f =>
+ f.onComplete(res => subscribed.complete(res))
+ }
+ }
+ .mapMaterializedValue(_ => subscribed.future)
+ }
+
+ "MQTT connection settings" should {
+ "accept standard things" in {
+ // #create-connection-settings
+ val connectionSettings = MqttConnectionSettings(
+ broker = "tcp://localhost:1883",
+ "test-scala-client",
+ persistence = new MemoryPersistence
+ )
+ // #create-connection-settings
+ connectionSettings.toString should include("tcp://localhost:1883")
+ }
+
+ "allow SSL" in {
+ // #ssl-settings
+ val connectionSettings = MqttConnectionSettings(
+ broker = "ssl://localhost:1885",
+ "ssl-client",
+ persistence = new MemoryPersistence).withAuth("mqttUser",
"mqttPassword")
+ .withSocketFactory(SSLContext.getDefault.getSocketFactory)
+ // #ssl-settings
+ connectionSettings.toString should include("ssl://localhost:1885")
+ connectionSettings.asMqttConnectionOptions().getUserName should
be("mqttUser")
+ }
+
+ "allow MQTT buffering offline support persistence" in {
+ // #OfflinePersistenceSettings
+ val bufferedConnectionSettings = MqttConnectionSettings(
+ broker = "ssl://localhost:1885",
+ "ssl-client",
+ persistence = new MemoryPersistence
+ ).withOfflinePersistenceSettings(
+ bufferSize = 1234,
+ deleteOldestMessage = true,
+ persistBuffer = false
+ )
+
+ bufferedConnectionSettings.toString should include(
+
"offlinePersistence=Some(MqttOfflinePersistenceSettings(1234,true,false))")
+ }
+ }
+
+ "mqtt source" should {
+ "consume unacknowledged messages from previous sessions using manualAck"
in {
+ import system.dispatcher
+
+ val topic = "v5/source-spec/manualacks"
+ val input = Vector("one", "two", "three", "four", "five")
+
+ // #create-source-with-manualacks
+ val mqttSource: Source[MqttMessageWithAck, Future[Done]] =
+ MqttSource.atLeastOnce(
+ connectionSettings
+ .withClientId("source-spec/source1")
+ .withCleanStart(false),
+ MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
+ bufferSize = 8)
+ // #create-source-with-manualacks
+
+ val (subscribed, unackedResult) =
mqttSource.take(input.size).toMat(Sink.seq)(Keep.both).run()
+ val mqttSink = MqttSink(sinkSettings, MqttQoS.AtLeastOnce)
+
+ Await.ready(subscribed, timeout)
+ Source(input).map(item => MqttMessage(topic,
ByteString(item))).runWith(mqttSink)
+
+ unackedResult.futureValue.map(message =>
message.message.payload.utf8String) should equal(input)
+
+ val businessLogic: Flow[MqttMessageWithAck, MqttMessageWithAck, NotUsed]
= Flow[MqttMessageWithAck]
+
+ // #run-source-with-manualacks
+ val result = mqttSource
+ .via(businessLogic)
+ .mapAsync(1)(messageWithAck => messageWithAck.ack().map(_ =>
messageWithAck.message))
+ .take(input.size)
+ .runWith(Sink.seq)
+ // #run-source-with-manualacks
+ result.futureValue.map(message => message.payload.utf8String) should
equal(input)
+ }
+
+ "keep connection open if downstream closes and there are pending acks" in {
+ val topic = "v5/source-spec/pendingacks"
+ val input = Vector("one", "two", "three", "four", "five")
+
+ val connectionSettings = sourceSettings.withCleanStart(false)
+ val subscriptions = MqttSubscriptions(topic, MqttQoS.AtLeastOnce)
+ val mqttSource = MqttSource.atLeastOnce(connectionSettings,
subscriptions, 8)
+
+ val (subscribed, unackedResult) =
mqttSource.take(input.size).toMat(Sink.seq)(Keep.both).run()
+ val mqttSink = MqttSink(sinkSettings, MqttQoS.AtLeastOnce)
+
+ Await.ready(subscribed, timeout)
+ Source(input).map(item => MqttMessage(topic,
ByteString(item))).runWith(mqttSink).futureValue shouldBe Done
+
+ unackedResult.futureValue.map(msg => {
+ noException should be thrownBy msg.ack().futureValue
+ })
+ }
+
+ "receive a message from a topic" in {
+ val msg = MqttMessage(topic1, ByteString("ohi"))
+
+ val subscriptions = MqttSubscriptions(topic1, MqttQoS.AtLeastOnce)
+ val (subscribed, result) = MqttSource
+ .atMostOnce(sourceSettings, subscriptions, 8)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+ result.futureValue shouldBe msg
+ }
+
+ "receive messages from multiple topics" in {
+ val topic2 = "v5/source-spec/topic2"
+ val messages = (0 until 7)
+ .flatMap(i =>
+ Seq(
+ MqttMessage(topic1, ByteString(s"ohi_$i")),
+ MqttMessage(topic2, ByteString(s"ohi_$i"))))
+
+ // #create-source
+ val mqttSource: Source[MqttMessage, Future[Done]] =
+ MqttSource.atMostOnce(
+ connectionSettings.withClientId("source-spec/source"),
+ MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 ->
MqttQoS.AtLeastOnce)),
+ bufferSize = 8)
+
+ val (subscribed, streamResult) = mqttSource
+ .take(messages.size)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+ // #create-source
+
+ Await.ready(subscribed, timeout)
+ // #run-sink
+ val sink: Sink[MqttMessage, Future[Done]] =
+ MqttSink(connectionSettings, MqttQoS.AtLeastOnce)
+ Source(messages).runWith(sink)
+ // #run-sink
+
+ streamResult.futureValue shouldBe messages
+ }
+
+ "connection should fail to wrong broker" in {
+ val wrongConnectionSettings =
connectionSettings.withBroker("tcp://localhost:1884")
+
+ val (subscribed, _) = MqttSource
+ .atMostOnce(wrongConnectionSettings, MqttSubscriptions(topic1,
MqttQoS.atLeastOnce), 8)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ subscribed.failed.futureValue shouldBe an[MqttException]
+ }
+
+ "fail connection when not providing the requested credentials" in {
+ val secureTopic = "v5/source-spec/secure-topic1"
+ val first = MqttSource
+ .atMostOnce(sourceSettings.withAuth("username1", "bad_password"),
+ MqttSubscriptions(secureTopic, MqttQoS.AtLeastOnce),
+ 8)
+ .runWith(Sink.head)
+
+ whenReady(first.failed) {
+ case e: MqttException => e.getMessage should include("Not authorized")
+ case e => throw e
+ }
+ }
+
+ "receive a message from a topic with right credentials" in {
+ val secureTopic = "v5/source-spec/secure-topic2"
+ val msg = MqttMessage(secureTopic, ByteString("ohi"))
+
+ val (subscribed, result) = MqttSource
+ .atMostOnce(sourceSettings.withAuth("username1", "password1"),
+ MqttSubscriptions(secureTopic, MqttQoS.AtLeastOnce),
+ 8)
+ .toMat(Sink.head)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+ Source.single(msg).runWith(MqttSink(sinkSettings.withAuth("username1",
"password1"), MqttQoS.AtLeastOnce))
+
+ result.futureValue shouldBe msg
+ }
+
+ "signal backpressure" in {
+ val bufferSize = 8
+ val overflow = 4
+ val messages = (1 until bufferSize + overflow)
+ .map(i => s"ohi_$i")
+
+ val (subscribed, result) = MqttSource
+ .atMostOnce(sourceSettings, MqttSubscriptions(topic1,
MqttQoS.AtLeastOnce), bufferSize)
+ .take(messages.size)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+ Source(messages)
+ .map(m => MqttMessage(topic1, ByteString(m)))
+ .runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+ result.futureValue.map(m => m.payload.utf8String) shouldBe messages
+ }
+
+ "work with fast downstream" in {
+ val bufferSize = 8
+ val overflow = 4
+ val messages = (1 until bufferSize + overflow)
+ .map(i => s"ohi_$i")
+
+ val (subscribed, result) = MqttSource
+ .atMostOnce(sourceSettings, MqttSubscriptions(topic1,
MqttQoS.AtLeastOnce), bufferSize)
+ .take(messages.size)
+ .toMat(Sink.seq)(Keep.both)
+ .run()
+
+ Await.ready(subscribed, timeout)
+ Source(messages)
+ .map(m => MqttMessage(topic1, ByteString(m)))
+ .runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+ result.futureValue.map(m => m.payload.utf8String) shouldBe messages
+ }
+
+ "support multiple materialization" in {
+ val source = MqttSource.atMostOnce(sourceSettings,
MqttSubscriptions(topic1, MqttQoS.AtLeastOnce), 8)
+
+ val (subscribed, elem) = source.toMat(Sink.head)(Keep.both).run()
+
+ Await.ready(subscribed, timeout)
+ Source.single(MqttMessage(topic1,
ByteString("ohi"))).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+ elem.futureValue shouldBe MqttMessage(topic1, ByteString("ohi"))
+
+ val (subscribed2, elem2) = source.toMat(Sink.head)(Keep.both).run()
+
+ Await.ready(subscribed2, timeout)
+ Source.single(MqttMessage(topic1,
ByteString("ohi"))).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+ elem2.futureValue shouldBe MqttMessage(topic1, ByteString("ohi"))
+ }
+
+ "automatically reconnect" in {
+ import system.dispatcher
+
+ val msg = MqttMessage(topic1, ByteString("ohi"))
+
+ // Create a proxy on an available port so it can be shut down
+ val (proxyBinding, connection) = Tcp().bind("localhost",
0).toMat(Sink.head)(Keep.both).run()
+ val proxyPort = proxyBinding.futureValue.localAddress.getPort
+ val proxyKs = connection.map { c =>
+ c.handleWith(
+ Tcp()
+ .outgoingConnection("localhost", 1883)
+ .viaMat(KillSwitches.single)(Keep.right))
+ }
+ Await.ready(proxyBinding, timeout)
+
+ val (subscribed, probe) = MqttSource
+ .atMostOnce(
+ sourceSettings
+ .withAutomaticReconnect(true)
+ .withCleanStart(false)
+ .withBroker(s"tcp://localhost:$proxyPort"),
+ MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
+ 8)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+
+ // Ensure that the connection made it all the way to the server by
waiting until it receives a message
+ Await.ready(subscribed, timeout)
+
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+ try {
+ probe.requestNext()
+ } catch {
+ case e: Exception =>
+ log.debug(s"Ignoring $e", e)
+ }
+ // Kill the proxy, producing an unexpected disconnection of the client
+ Await.result(proxyKs, timeout).shutdown()
+
+ // Restart the proxy
+ val (proxyBinding2, connection2) = Tcp().bind("localhost",
proxyPort).toMat(Sink.head)(Keep.both).run()
+ val proxyKs2 = connection2.map { c =>
+ c.handleWith(
+ Tcp()
+ .outgoingConnection("localhost", 1883)
+ .viaMat(KillSwitches.single)(Keep.right))
+ }
+ Await.ready(proxyBinding2, timeout)
+
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+ probe.requestNext(5.seconds) shouldBe msg
+ Await.result(proxyKs2, timeout).shutdown()
+ }
+
+ "support will message" in {
+ import system.dispatcher
+
+ val willTopic = "v5/source-spec/will"
+ val msg = MqttMessage(topic1, ByteString("ohi"))
+
+ // #will-message
+ val lastWill = MqttMessage(willTopic, ByteString("ohi"))
+ .withQos(MqttQoS.AtLeastOnce)
+ .withRetained(true)
+ // #will-message
+
+ // Create a proxy on an available port so it can be shut down
+ val (proxyBinding, connection) = Tcp().bind("localhost",
0).toMat(Sink.head)(Keep.both).run()
+ val proxyPort = proxyBinding.futureValue.localAddress.getPort
+ val proxyKs = connection.map { c =>
+ c.handleWith(
+ Tcp()
+ .outgoingConnection("localhost", 1883)
+ .viaMat(KillSwitches.single)(Keep.right))
+ }
+ Await.ready(proxyBinding, timeout)
+
+ val source1 = wrapWithRestart(
+ MqttSource
+ .atMostOnce(
+ sourceSettings
+ .withClientId("source-spec/testator")
+ .withBroker(s"tcp://localhost:$proxyPort")
+ .withWill(lastWill),
+ MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
+ 8))
+
+ val (subscribed, probe) = source1.toMat(TestSink.probe)(Keep.both).run()
+
+ // Ensure that the connection made it all the way to the server by
waiting until it receives a message
+ Await.ready(subscribed, timeout)
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+ try {
+ probe.requestNext()
+ } catch {
+ case e: Exception =>
+ log.debug(s"Ignoring $e", e)
+ }
+
+ // Kill the proxy, producing an unexpected disconnection of the client
+ Await.result(proxyKs, timeout).shutdown()
+
+ val source2 =
MqttSource.atMostOnce(sourceSettings.withClientId("source-spec/executor"),
+ MqttSubscriptions(willTopic, MqttQoS.AtLeastOnce),
+ 8)
+
+ val elem = source2.runWith(Sink.head)
+ elem.futureValue shouldBe MqttMessage(willTopic, ByteString("ohi"))
+ }
+
+ "support buffering message on disconnect" in {
+ import system.dispatcher
+
+ val msg = MqttMessage(topic1, ByteString("ohi"))
+
+ val sharedKillSwitch = KillSwitches.shared("buffered-test-kill-switch")
+
+ // Create a proxy on an available port so it can be shut down
+ val (proxyBinding, connection) = Tcp().bind("localhost",
0).toMat(Sink.head)(Keep.both).run()
+ val proxyPort = proxyBinding.futureValue.localAddress.getPort
+ connection.map { c =>
+ c.handleWith(
+ Tcp()
+ .outgoingConnection("localhost", 1883)
+ .via(sharedKillSwitch.flow))
+ }
+ Await.ready(proxyBinding, timeout)
+
+ val (killSwitch, probe) = MqttSource
+ .atMostOnce(
+ sourceSettings
+ .withCleanStart(false)
+ .withBroker(s"tcp://localhost:$proxyPort")
+ .withOfflinePersistenceSettings(bufferSize = 1234),
+ MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
+ 8)
+ .via(sharedKillSwitch.flow)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+ Await.ready(killSwitch, timeout)
+
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+ try {
+ probe.requestNext()
+ } catch {
+ case e: Exception =>
+ log.debug(s"Ignoring $e", e)
+ }
+ // Kill the proxy and stream
+ sharedKillSwitch.shutdown()
+
+ // Send message with connection and stream down
+ Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+ // Restart the proxy
+ val (proxyBinding2, connection2) = Tcp().bind("localhost",
proxyPort).toMat(Sink.head)(Keep.both).run()
+ val proxyKs2 = connection2.map { c =>
+ c.handleWith(
+ Tcp()
+ .outgoingConnection("localhost", 1883)
+ .viaMat(KillSwitches.single)(Keep.right))
+ }
+ Await.ready(proxyBinding2, timeout)
+
+ // Rebuild MQTT connection to broker
+ val (subscribed, probe2) = MqttSource
+ .atMostOnce(
+ sourceSettings
+ .withCleanStart(false)
+ .withBroker(s"tcp://localhost:$proxyPort")
+ .withOfflinePersistenceSettings(bufferSize = 1234),
+ MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
+ 8)
+ .toMat(TestSink.probe)(Keep.both)
+ .run()
+
+ // Ensure that the connection made it all the way to the server by
waiting until it receives a message
+ Await.ready(subscribed, timeout)
+
+ probe2.requestNext(5.seconds) shouldBe msg
+ Await.result(proxyKs2, timeout).shutdown()
+ }
+ }
+}
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSpecBase.scala
b/mqttv5/src/test/scala/docs/scaladsl/MqttSpecBase.scala
new file mode 100644
index 000000000..c031488be
--- /dev/null
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSpecBase.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.scaladsl
+
+import scala.concurrent.duration._
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
+import org.apache.pekko.stream.connectors.testkit.scaladsl.LogCapturing
+import org.apache.pekko.testkit.TestKit
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.IntegrationPatience
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+abstract class MqttSpecBase(name: String)
+ extends TestKit(ActorSystem(s"${name}_v5"))
+ with AnyWordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with ScalaFutures
+ with Eventually
+ with IntegrationPatience
+ with LogCapturing {
+ val connectionSettings: MqttConnectionSettings = MqttConnectionSettings(
+ broker = "tcp://localhost:1883",
+ clientId = "test-client",
+ persistence = new MemoryPersistence
+ )
+
+ val timeout: FiniteDuration = 5.seconds
+
+ override def afterAll(): Unit = TestKit.shutdownActorSystem(system)
+
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 5a2da0811..a7aa97ab2 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -418,6 +418,10 @@ object Dependencies {
libraryDependencies ++= Seq(
"org.eclipse.paho" % "org.eclipse.paho.client.mqttv3" % "1.2.5"))
+ val MqttV5 = Seq(
+ libraryDependencies ++= Seq(
+ "org.eclipse.paho" % "org.eclipse.paho.mqttv5.client" % "1.2.5"))
+
val MqttStreaming = Seq(
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion,
diff --git a/project/project-info.conf b/project/project-info.conf
index ef3d19d0c..dd95b47b0 100644
--- a/project/project-info.conf
+++ b/project/project-info.conf
@@ -396,6 +396,17 @@ project-info {
}
]
}
+ mqttv5: ${project-info.shared-info} {
+ title: "Apache Pekko Connectors MQTT v5"
+ jpms-name: "pekko.stream.connectors.mqttv5"
+ issues.url: ${project-info.labels}"mqttv5"
+ api-docs: [
+ {
+ url: ${project-info.scaladoc}"mqttv5/index.html"
+ text: "API (Scaladoc)"
+ }
+ ]
+ }
mqtt-streaming: ${project-info.shared-info} {
title: "Apache Pekko Connectors MQTT Streaming"
jpms-name: "pekko.stream.connectors.mqttStreaming"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]