This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 84d9ed6ef add Java API methods to mqttv5 classes (#1486)
84d9ed6ef is described below

commit 84d9ed6eff45142f52903ac84ed47e5bdb437308
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 10 09:33:23 2026 +0100

    add Java API methods to mqttv5 classes (#1486)
    
    * add Java API methods to mqttv5 classes
    
    * Update model.scala
    
    * Update model.scala
---
 .../pekko/stream/connectors/mqttv5/model.scala     | 42 +++++++++++++++++-----
 .../src/test/java/docs/javadsl/MqttSourceTest.java | 10 +++---
 2 files changed, 39 insertions(+), 13 deletions(-)

diff --git 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
index 6c8d594c0..c49cad7f9 100644
--- 
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
+++ 
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
@@ -17,8 +17,16 @@ import org.apache.pekko
 
 import scala.collection.immutable
 import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
 
 final class MqttUserProperty private (val key: String, val value: String) {
+
+  /** Java API */
+  def getKey(): String = key
+
+  /** Java API */
+  def getValue(): String = value
+
   override def toString = s"MqttUserProperty(key=$key,value=$value)"
 
   override def equals(other: Any): Boolean = other match {
@@ -45,7 +53,7 @@ final class MqttMessage private (
     val payload: org.apache.pekko.util.ByteString,
     val qos: Option[MqttQoS],
     val retained: Boolean,
-    val userProperties: Array[MqttUserProperty]
+    val userProperties: Seq[MqttUserProperty]
 ) {
 
   def withTopic(value: String): MqttMessage = copy(topic = value)
@@ -56,18 +64,36 @@ final class MqttMessage private (
 
   /** Scala API */
   def withUserProperties(value: immutable.Seq[MqttUserProperty]): MqttMessage =
-    copy(userProperties = value.toArray)
+    copy(userProperties = value)
 
   /** Java API */
   def withUserProperties(value: java.util.List[MqttUserProperty]): MqttMessage 
=
-    copy(userProperties = value.asScala.toArray)
+    copy(userProperties = value.asScala.toSeq)
+
+  /**
+   * Java API. Returns the user properties.
+   * Modifying the returned list will not change the user properties of this 
message.
+   */
+  def getUserProperties(): java.util.List[MqttUserProperty] = 
userProperties.asJava
+
+  /** Java API */
+  def isRetained(): Boolean = retained
+
+  /** Java API */
+  def getQoS(): java.util.Optional[MqttQoS] = qos.toJava
+
+  /** Java API */
+  def getPayload(): org.apache.pekko.util.ByteString = payload
+
+  /** Java API */
+  def getTopic(): String = topic
 
   private def copy(
       topic: String = topic,
       payload: pekko.util.ByteString = payload,
       qos: Option[MqttQoS] = qos,
       retained: Boolean = retained,
-      userProperties: Array[MqttUserProperty] = userProperties): MqttMessage =
+      userProperties: Seq[MqttUserProperty] = userProperties): MqttMessage =
     new MqttMessage(topic = topic, payload = payload, qos = qos, retained = 
retained, userProperties = userProperties)
 
   override def toString =
@@ -80,12 +106,12 @@ final class MqttMessage private (
       java.util.Objects.equals(this.payload, that.payload) &&
       java.util.Objects.equals(this.qos, that.qos) &&
       java.util.Objects.equals(this.retained, that.retained) &&
-      java.util.Objects.equals(this.userProperties.toSeq, 
that.userProperties.toSeq)
+      java.util.Objects.equals(this.userProperties, that.userProperties)
     case _ => false
   }
 
   override def hashCode(): Int =
-    java.util.Objects.hash(topic, payload, qos, Boolean.box(retained), 
userProperties.toSeq)
+    java.util.Objects.hash(topic, payload, qos, Boolean.box(retained), 
userProperties)
 }
 
 object MqttMessage {
@@ -98,7 +124,7 @@ object MqttMessage {
     payload,
     qos = None,
     retained = false,
-    userProperties = Array.empty)
+    userProperties = Seq.empty)
 
   /** Java API */
   def create(
@@ -108,5 +134,5 @@ object MqttMessage {
     payload,
     qos = None,
     retained = false,
-    userProperties = Array.empty)
+    userProperties = Seq.empty)
 }
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java 
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index bae3e3771..e44732009 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -371,11 +371,11 @@ public class MqttSourceTest {
         Source.single(msg).runWith(mqttSink, system);
 
         MqttMessage received = result.second().toCompletableFuture().get(5, 
TimeUnit.SECONDS);
-        List<MqttUserProperty> userProps = 
Arrays.asList(received.userProperties());
+        List<MqttUserProperty> userProps = received.getUserProperties();
         assertEquals(2, userProps.size());
-        assertEquals("x-trace-id", userProps.get(0).key());
-        assertEquals("abc123", userProps.get(0).value());
-        assertEquals("x-tenant", userProps.get(1).key());
-        assertEquals("acme", userProps.get(1).value());
+        assertEquals("x-trace-id", userProps.get(0).getKey());
+        assertEquals("abc123", userProps.get(0).getValue());
+        assertEquals("x-tenant", userProps.get(1).getKey());
+        assertEquals("acme", userProps.get(1).getValue());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to