This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c12ec698 MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480)
7c12ec698 is described below

commit 7c12ec698c3e0b3b0c8358dda39cb3610521b37b
Author: anton-curanz-mw <[email protected]>
AuthorDate: Fri Mar 6 18:20:46 2026 +0100

    MQTT v5: expose MQTT 5 user properties on MqttMessage (#1480)
    
    * Add MqttUserProperty model class (Scala/Java API)
    * Add userProperties field to MqttMessage with withUserProperties builder
    * Map incoming Paho user properties to MqttUserProperty on messageArrived
    * Map outgoing MqttUserProperty to Paho MqttProperties on publish
    * Add ACL entries and tests (Scala + Java) for user property round-trip
---
 mqtt/src/test/travis/acl                           |  2 +
 .../connectors/mqttv5/impl/MqttFlowStage.scala     | 17 ++++++-
 .../pekko/stream/connectors/mqttv5/model.scala     | 58 ++++++++++++++++++----
 .../src/test/java/docs/javadsl/MqttSourceTest.java | 40 +++++++++++++++
 .../test/scala/docs/scaladsl/MqttSourceSpec.scala  | 20 ++++++++
 5 files changed, 127 insertions(+), 10 deletions(-)

diff --git a/mqtt/src/test/travis/acl b/mqtt/src/test/travis/acl
index 8bf95d38d..104dfb743 100644
--- a/mqtt/src/test/travis/acl
+++ b/mqtt/src/test/travis/acl
@@ -23,6 +23,7 @@ topic v5/coffee/level
 topic v5/source-spec/will
 topic v5/source-spec/manualacks
 topic v5/source-spec/pendingacks
+topic v5/source-spec/user-props
 topic v5/sink-spec/topic1
 topic v5/sink-spec/topic2
 topic v5/sink-spec/topic3
@@ -32,6 +33,7 @@ topic v5/source-test/topic2
 topic v5/source-test/will
 topic v5/source-test/manualacks
 topic v5/source-test/pendingacks
+topic v5/source-test/user-props
 topic v5/flow-spec/topic-ack
 topic v5/flow-test/topic-ack
 topic v5/typed-flow-spec/topic1
diff --git 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
index f60baa4a6..8b50124b6 100644
--- 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
+++ 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala
@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.mutable
 import scala.concurrent.Future
 import scala.concurrent.Promise
+import scala.jdk.CollectionConverters._
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
@@ -31,6 +32,7 @@ import org.apache.pekko.stream._
 import org.apache.pekko.stream.connectors.mqttv5.AuthSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
 import org.apache.pekko.stream.connectors.mqttv5.MqttOfflinePersistenceSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
@@ -46,6 +48,7 @@ import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse
 import org.eclipse.paho.mqttv5.common.MqttException
 import org.eclipse.paho.mqttv5.common.packet.MqttProperties
 import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode
+import org.eclipse.paho.mqttv5.common.packet.UserProperty
 import org.eclipse.paho.mqttv5.common.{ MqttMessage => PahoMqttMessage }
 
 /**
@@ -235,8 +238,14 @@ abstract class MqttFlowStageLogic[I](
     new MqttCallback {
       override def messageArrived(topic: String, pahoMessage: 
PahoMqttMessage): Unit = {
         backpressurePahoClient.acquire()
+        val userProps: Seq[MqttUserProperty] =
+          Option(pahoMessage.getProperties)
+            .map(_.getUserProperties.asScala.map(p => 
MqttUserProperty(p.getKey, p.getValue)).toList)
+            .getOrElse(Nil)
         val message = new MqttMessageWithAck {
-          override val message: MqttMessage = MqttMessage(topic, 
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+          override val message: MqttMessage =
+            MqttMessage(topic, 
ByteString.fromArrayUnsafe(pahoMessage.getPayload))
+              .withUserProperties(userProps)
 
           override def ack(): Future[Done] = {
             val promise = Promise[Done]()
@@ -404,6 +413,12 @@ abstract class MqttFlowStageLogic[I](
     pahoMsg.setQos(msg.qos.getOrElse(defaultQoS).value)
     pahoMsg.setRetained(msg.retained)
 
+    if (msg.userProperties.nonEmpty) {
+      val pahoProps = new MqttProperties()
+      pahoProps.setUserProperties(msg.userProperties.map(p => new 
UserProperty(p.key, p.value)).toList.asJava)
+      pahoMsg.setProperties(pahoProps)
+    }
+
     mqttClient.publish(
       msg.topic,
       pahoMsg,
diff --git 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
index 106488b16..6c8d594c0 100644
--- 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
+++ 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
@@ -15,11 +15,37 @@ package org.apache.pekko.stream.connectors.mqttv5
 
 import org.apache.pekko
 
+import scala.collection.immutable
+import scala.jdk.CollectionConverters._
+
+final class MqttUserProperty private (val key: String, val value: String) {
+  override def toString = s"MqttUserProperty(key=$key,value=$value)"
+
+  override def equals(other: Any): Boolean = other match {
+    case that: MqttUserProperty =>
+      java.util.Objects.equals(this.key, that.key) &&
+      java.util.Objects.equals(this.value, that.value)
+    case _ => false
+  }
+
+  override def hashCode(): Int = java.util.Objects.hash(key, value)
+}
+
+object MqttUserProperty {
+
+  /** Scala API */
+  def apply(key: String, value: String): MqttUserProperty = new 
MqttUserProperty(key, value)
+
+  /** Java API */
+  def create(key: String, value: String): MqttUserProperty = new 
MqttUserProperty(key, value)
+}
+
 final class MqttMessage private (
     val topic: String,
     val payload: org.apache.pekko.util.ByteString,
     val qos: Option[MqttQoS],
-    val retained: Boolean
+    val retained: Boolean,
+    val userProperties: Array[MqttUserProperty]
 ) {
 
   def withTopic(value: String): MqttMessage = copy(topic = value)
@@ -28,26 +54,38 @@ final class MqttMessage private (
   def withQos(value: MqttQoS): MqttMessage = copy(qos = Option(value))
   def withRetained(value: Boolean): MqttMessage = if (retained == value) this 
else copy(retained = value)
 
-  private def copy(topic: String = topic,
+  /** Scala API */
+  def withUserProperties(value: immutable.Seq[MqttUserProperty]): MqttMessage =
+    copy(userProperties = value.toArray)
+
+  /** Java API */
+  def withUserProperties(value: java.util.List[MqttUserProperty]): MqttMessage 
=
+    copy(userProperties = value.asScala.toArray)
+
+  private def copy(
+      topic: String = topic,
       payload: pekko.util.ByteString = payload,
       qos: Option[MqttQoS] = qos,
-      retained: Boolean = retained): MqttMessage =
-    new MqttMessage(topic = topic, payload = payload, qos = qos, retained = 
retained)
+      retained: Boolean = retained,
+      userProperties: Array[MqttUserProperty] = userProperties): MqttMessage =
+    new MqttMessage(topic = topic, payload = payload, qos = qos, retained = 
retained, userProperties = userProperties)
 
   override def toString =
-    
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained)"""
+    
s"""MqttMessage(topic=$topic,payload=$payload,qos=$qos,retained=$retained,userProperties=${userProperties.mkString(
+        "[", ", ", "]")})"""
 
   override def equals(other: Any): Boolean = other match {
     case that: MqttMessage =>
       java.util.Objects.equals(this.topic, that.topic) &&
       java.util.Objects.equals(this.payload, that.payload) &&
       java.util.Objects.equals(this.qos, that.qos) &&
-      java.util.Objects.equals(this.retained, that.retained)
+      java.util.Objects.equals(this.retained, that.retained) &&
+      java.util.Objects.equals(this.userProperties.toSeq, 
that.userProperties.toSeq)
     case _ => false
   }
 
   override def hashCode(): Int =
-    java.util.Objects.hash(topic, payload, qos, Boolean.box(retained))
+    java.util.Objects.hash(topic, payload, qos, Boolean.box(retained), 
userProperties.toSeq)
 }
 
 object MqttMessage {
@@ -59,7 +97,8 @@ object MqttMessage {
     topic,
     payload,
     qos = None,
-    retained = false)
+    retained = false,
+    userProperties = Array.empty)
 
   /** Java API */
   def create(
@@ -68,5 +107,6 @@ object MqttMessage {
     topic,
     payload,
     qos = None,
-    retained = false)
+    retained = false,
+    userProperties = Array.empty)
 }
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java 
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index 395a88aed..bae3e3771 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -23,6 +23,7 @@ import 
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings;
 import org.apache.pekko.stream.connectors.mqttv5.MqttMessage;
 import org.apache.pekko.stream.connectors.mqttv5.MqttQoS;
 import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions;
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty;
 import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttMessageWithAck;
 import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSink;
 import org.apache.pekko.stream.connectors.mqttv5.javadsl.MqttSource;
@@ -50,6 +51,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import scala.jdk.javaapi.CollectionConverters;
+
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -338,4 +341,41 @@ public class MqttSourceTest {
                 MqttMessage.create(willTopic, ByteString.fromString("ohi")),
                 elem.toCompletableFuture().get(3, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void receiveUserProperties() throws Exception {
+        final String topic = "v5/source-test/user-props";
+        MqttConnectionSettings connectionSettings =
+                MqttConnectionSettings.create("tcp://localhost:1883", 
"test-java-user-props", new MemoryPersistence());
+
+        Source<MqttMessage, CompletionStage<Done>> source =
+                MqttSource.atMostOnce(
+                        
connectionSettings.withClientId("source-test/user-props-source"),
+                        MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
+                        bufferSize);
+
+        Pair<CompletionStage<Done>, CompletionStage<MqttMessage>> result =
+                source.toMat(Sink.head(), Keep.both()).run(system);
+
+        result.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+        List<MqttUserProperty> userPropsToSend = Arrays.asList(
+                MqttUserProperty.create("x-trace-id", "abc123"),
+                MqttUserProperty.create("x-tenant", "acme"));
+        MqttMessage msg = MqttMessage.create(topic, 
ByteString.fromString("test"))
+                .withUserProperties(userPropsToSend);
+        Sink<MqttMessage, CompletionStage<Done>> mqttSink =
+                MqttSink.create(
+                        
connectionSettings.withClientId("source-test/user-props-sink"),
+                        MqttQoS.atLeastOnce());
+        Source.single(msg).runWith(mqttSink, system);
+
+        MqttMessage received = result.second().toCompletableFuture().get(5, 
TimeUnit.SECONDS);
+        List<MqttUserProperty> userProps = 
Arrays.asList(received.userProperties());
+        assertEquals(2, userProps.size());
+        assertEquals("x-trace-id", userProps.get(0).key());
+        assertEquals("abc123", userProps.get(0).value());
+        assertEquals("x-tenant", userProps.get(1).key());
+        assertEquals("acme", userProps.get(1).value());
+    }
 }
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala 
b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index bedd05ab3..7b875d171 100644
--- a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -28,6 +28,7 @@ import 
org.apache.pekko.stream.connectors.mqttv5.MqttConnectionSettings
 import org.apache.pekko.stream.connectors.mqttv5.MqttMessage
 import org.apache.pekko.stream.connectors.mqttv5.MqttQoS
 import org.apache.pekko.stream.connectors.mqttv5.MqttSubscriptions
+import org.apache.pekko.stream.connectors.mqttv5.MqttUserProperty
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttMessageWithAck
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSink
 import org.apache.pekko.stream.connectors.mqttv5.scaladsl.MqttSource
@@ -492,4 +493,23 @@ class MqttSourceSpec extends 
MqttSpecBase("MqttSourceSpec") {
       Await.result(proxyKs2, timeout).shutdown()
     }
   }
+
+  "receive user properties from a message" in {
+    val topic = "v5/source-spec/user-props"
+    val expectedProps = Seq(
+      MqttUserProperty("x-trace-id", "abc123"),
+      MqttUserProperty("x-tenant", "acme"))
+
+    val (subscribed, result) = MqttSource
+      .atMostOnce(sourceSettings, MqttSubscriptions(topic, 
MqttQoS.AtLeastOnce), 8)
+      .toMat(Sink.head)(Keep.both)
+      .run()
+
+    Await.ready(subscribed, timeout)
+
+    val msg = MqttMessage(topic, 
ByteString("test")).withUserProperties(expectedProps)
+    Source.single(msg).runWith(MqttSink(sinkSettings, MqttQoS.AtLeastOnce))
+
+    result.futureValue.userProperties shouldBe expectedProps
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to