This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 84d9ed6ef add Java API methods to mqttv5 classes (#1486)
84d9ed6ef is described below
commit 84d9ed6eff45142f52903ac84ed47e5bdb437308
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Mar 10 09:33:23 2026 +0100
add Java API methods to mqttv5 classes (#1486)
* add Java API methods to mqttv5 classes
* Update model.scala
* Update model.scala
---
.../pekko/stream/connectors/mqttv5/model.scala | 42 +++++++++++++++++-----
.../src/test/java/docs/javadsl/MqttSourceTest.java | 10 +++---
2 files changed, 39 insertions(+), 13 deletions(-)
diff --git
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
index 6c8d594c0..c49cad7f9 100644
---
a/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
+++
b/mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/model.scala
@@ -17,8 +17,16 @@ import org.apache.pekko
import scala.collection.immutable
import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
final class MqttUserProperty private (val key: String, val value: String) {
+
+ /** Java API */
+ def getKey(): String = key
+
+ /** Java API */
+ def getValue(): String = value
+
override def toString = s"MqttUserProperty(key=$key,value=$value)"
override def equals(other: Any): Boolean = other match {
@@ -45,7 +53,7 @@ final class MqttMessage private (
val payload: org.apache.pekko.util.ByteString,
val qos: Option[MqttQoS],
val retained: Boolean,
- val userProperties: Array[MqttUserProperty]
+ val userProperties: Seq[MqttUserProperty]
) {
def withTopic(value: String): MqttMessage = copy(topic = value)
@@ -56,18 +64,36 @@ final class MqttMessage private (
/** Scala API */
def withUserProperties(value: immutable.Seq[MqttUserProperty]): MqttMessage =
- copy(userProperties = value.toArray)
+ copy(userProperties = value)
/** Java API */
def withUserProperties(value: java.util.List[MqttUserProperty]): MqttMessage
=
- copy(userProperties = value.asScala.toArray)
+ copy(userProperties = value.asScala.toSeq)
+
+ /**
+ * Java API. Returns the user properties.
+ * Modifying the returned list will not change the user properties of this
message.
+ */
+ def getUserProperties(): java.util.List[MqttUserProperty] =
userProperties.asJava
+
+ /** Java API */
+ def isRetained(): Boolean = retained
+
+ /** Java API */
+ def getQoS(): java.util.Optional[MqttQoS] = qos.toJava
+
+ /** Java API */
+ def getPayload(): org.apache.pekko.util.ByteString = payload
+
+ /** Java API */
+ def getTopic(): String = topic
private def copy(
topic: String = topic,
payload: pekko.util.ByteString = payload,
qos: Option[MqttQoS] = qos,
retained: Boolean = retained,
- userProperties: Array[MqttUserProperty] = userProperties): MqttMessage =
+ userProperties: Seq[MqttUserProperty] = userProperties): MqttMessage =
new MqttMessage(topic = topic, payload = payload, qos = qos, retained =
retained, userProperties = userProperties)
override def toString =
@@ -80,12 +106,12 @@ final class MqttMessage private (
java.util.Objects.equals(this.payload, that.payload) &&
java.util.Objects.equals(this.qos, that.qos) &&
java.util.Objects.equals(this.retained, that.retained) &&
- java.util.Objects.equals(this.userProperties.toSeq,
that.userProperties.toSeq)
+ java.util.Objects.equals(this.userProperties, that.userProperties)
case _ => false
}
override def hashCode(): Int =
- java.util.Objects.hash(topic, payload, qos, Boolean.box(retained),
userProperties.toSeq)
+ java.util.Objects.hash(topic, payload, qos, Boolean.box(retained),
userProperties)
}
object MqttMessage {
@@ -98,7 +124,7 @@ object MqttMessage {
payload,
qos = None,
retained = false,
- userProperties = Array.empty)
+ userProperties = Seq.empty)
/** Java API */
def create(
@@ -108,5 +134,5 @@ object MqttMessage {
payload,
qos = None,
retained = false,
- userProperties = Array.empty)
+ userProperties = Seq.empty)
}
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index bae3e3771..e44732009 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -371,11 +371,11 @@ public class MqttSourceTest {
Source.single(msg).runWith(mqttSink, system);
MqttMessage received = result.second().toCompletableFuture().get(5,
TimeUnit.SECONDS);
- List<MqttUserProperty> userProps =
Arrays.asList(received.userProperties());
+ List<MqttUserProperty> userProps = received.getUserProperties();
assertEquals(2, userProps.size());
- assertEquals("x-trace-id", userProps.get(0).key());
- assertEquals("abc123", userProps.get(0).value());
- assertEquals("x-tenant", userProps.get(1).key());
- assertEquals("acme", userProps.get(1).value());
+ assertEquals("x-trace-id", userProps.get(0).getKey());
+ assertEquals("abc123", userProps.get(0).getValue());
+ assertEquals("x-tenant", userProps.get(1).getKey());
+ assertEquals("acme", userProps.get(1).getValue());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]