This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 498197b31e4 Add mqtt IT for json mode. (#17090)
498197b31e4 is described below

commit 498197b31e4134f5597d55b42a11d822f7b27f10
Author: wenyanshi-123 <[email protected]>
AuthorDate: Thu Jan 29 11:30:30 2026 +0800

    Add mqtt IT for json mode. (#17090)
---
 .../iotdb/db/it/mqtt/IoTDBMQTTServiceJsonIT.java   | 363 +++++++++++++++++++++
 .../relational/it/mqtt/IoTDBMQTTServiceIT.java     |   1 +
 2 files changed, 364 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTServiceJsonIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTServiceJsonIT.java
new file mode 100644
index 00000000000..b9fdb5e1980
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/mqtt/IoTDBMQTTServiceJsonIT.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.mqtt;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
+import org.awaitility.Awaitility;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for MQTT service with JSON payload formatter. JSON 
formatter supports tree
+ * model data insertion.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBMQTTServiceJsonIT {
+
+  private BlockingConnection connection;
+  private static final String IP = System.getProperty("RemoteIp", "127.0.0.1");
+  private static final String USER = System.getProperty("RemoteUser", "root");
+  private static final String PASSWORD = System.getProperty("RemotePassword", 
"root");
+  public static final String FORMATTER = "json";
+
+  @Before
+  public void setUp() throws Exception {
+    BaseEnv baseEnv = EnvFactory.getEnv();
+    baseEnv.getConfig().getDataNodeConfig().setEnableMQTTService(true);
+    baseEnv.getConfig().getDataNodeConfig().setMqttPayloadFormatter(FORMATTER);
+    baseEnv.initClusterEnvironment();
+    DataNodeWrapper portConflictDataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    int port = portConflictDataNodeWrapper.getMqttPort();
+    MQTT mqtt = new MQTT();
+    mqtt.setHost(IP, port);
+    mqtt.setUserName(USER);
+    mqtt.setPassword(PASSWORD);
+    mqtt.setConnectAttemptsMax(3);
+    mqtt.setReconnectDelay(10);
+    mqtt.setClientId("jsonClientId1");
+
+    connection = mqtt.blockingConnection();
+    connection.connect();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (connection != null) {
+        connection.disconnect();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  /** Test single JSON message with multiple measurements */
+  @Test
+  public void testSingleJsonMessage() throws Exception {
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      String payload =
+          "{"
+              + "\"device\":\"root.sg.d1\","
+              + "\"timestamp\":1,"
+              + "\"measurements\":[\"s1\",\"s2\"],"
+              + "\"values\":[1.5,2.5]"
+              + "}";
+
+      Awaitility.await()
+          .atMost(3, TimeUnit.MINUTES)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                connection.publish("root.sg.d1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+                try (final SessionDataSet dataSet =
+                    session.executeQueryStatement("select s1, s2 from 
root.sg.d1 where time = 1")) {
+                  if (!dataSet.hasNext()) {
+                    return false;
+                  }
+                  RowRecord row = dataSet.next();
+                  List<Field> fields = row.getFields();
+                  assertEquals(2, fields.size());
+                  assertEquals(1.5, fields.get(0).getDoubleV(), 0.001);
+                  assertEquals(2.5, fields.get(1).getDoubleV(), 0.001);
+                  return true;
+                } catch (StatementExecutionException e) {
+                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
+                    return false;
+                  } else {
+                    throw e;
+                  }
+                }
+              });
+    }
+  }
+
+  /** Test batch JSON message with timestamps array */
+  @Test
+  public void testBatchJsonMessage() throws Exception {
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      String payload =
+          "{"
+              + "\"device\":\"root.sg.d2\","
+              + "\"timestamps\":[1,2,3],"
+              + "\"measurements\":[\"s1\",\"s2\"],"
+              + "\"values\":[[1.0,2.0],[3.0,4.0],[5.0,6.0]]"
+              + "}";
+
+      Awaitility.await()
+          .atMost(3, TimeUnit.MINUTES)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                connection.publish("root.sg.d2", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+                try (final SessionDataSet dataSet =
+                    session.executeQueryStatement("select count(s1) from 
root.sg.d2")) {
+                  if (!dataSet.hasNext()) {
+                    return false;
+                  }
+                  RowRecord row = dataSet.next();
+                  // Should have 3 records
+                  assertEquals(3, row.getFields().get(0).getLongV());
+                  return true;
+                } catch (StatementExecutionException e) {
+                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
+                    return false;
+                  } else {
+                    throw e;
+                  }
+                }
+              });
+    }
+  }
+
+  /** Test JSON array with multiple messages */
+  @Test
+  public void testJsonArray() throws Exception {
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      String payload =
+          "["
+              + 
"{\"device\":\"root.sg.d3\",\"timestamp\":1,\"measurements\":[\"s1\"],\"values\":[10.0]},"
+              + 
"{\"device\":\"root.sg.d3\",\"timestamp\":2,\"measurements\":[\"s1\"],\"values\":[20.0]},"
+              + 
"{\"device\":\"root.sg.d3\",\"timestamp\":3,\"measurements\":[\"s1\"],\"values\":[30.0]}"
+              + "]";
+
+      Awaitility.await()
+          .atMost(3, TimeUnit.MINUTES)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                connection.publish("root.sg.d3", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+                try (final SessionDataSet dataSet =
+                    session.executeQueryStatement("select s1 from 
root.sg.d3")) {
+                  int count = 0;
+                  double sum = 0;
+                  while (dataSet.hasNext()) {
+                    RowRecord row = dataSet.next();
+                    sum += row.getFields().get(0).getDoubleV();
+                    count++;
+                  }
+                  if (count != 3) {
+                    return false;
+                  }
+                  // sum should be 10 + 20 + 30 = 60
+                  assertEquals(60.0, sum, 0.001);
+                  return true;
+                } catch (StatementExecutionException e) {
+                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
+                    return false;
+                  } else {
+                    throw e;
+                  }
+                }
+              });
+    }
+  }
+
+  /** Test JSON with explicit data types */
+  @Test
+  public void testJsonWithDataTypes() throws Exception {
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      String payload =
+          "{"
+              + "\"device\":\"root.sg.d4\","
+              + "\"timestamp\":1,"
+              + 
"\"measurements\":[\"intVal\",\"floatVal\",\"boolVal\",\"textVal\"],"
+              + "\"values\":[100,3.14,true,\"hello\"],"
+              + "\"datatypes\":[\"INT32\",\"FLOAT\",\"BOOLEAN\",\"TEXT\"]"
+              + "}";
+
+      Awaitility.await()
+          .atMost(3, TimeUnit.MINUTES)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                connection.publish("root.sg.d4", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+                try (final SessionDataSet dataSet =
+                    session.executeQueryStatement(
+                        "select intVal, floatVal, boolVal, textVal from 
root.sg.d4 where time = 1")) {
+                  if (!dataSet.hasNext()) {
+                    return false;
+                  }
+                  List<Field> fields = dataSet.next().getFields();
+                  assertEquals(4, fields.size());
+                  assertEquals(100, fields.get(0).getIntV());
+                  assertEquals(3.14f, fields.get(1).getFloatV(), 0.01);
+                  assertTrue(fields.get(2).getBoolV());
+                  assertEquals("hello", fields.get(3).getStringValue());
+                  return true;
+                } catch (StatementExecutionException e) {
+                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
+                    return false;
+                  } else {
+                    throw e;
+                  }
+                }
+              });
+    }
+  }
+
+  /** Test multiple devices in single JSON array */
+  @Test
+  public void testMultipleDevicesJsonArray() throws Exception {
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      String payload =
+          "["
+              + 
"{\"device\":\"root.sg.device1\",\"timestamp\":1,\"measurements\":[\"temp\"],\"values\":[25.5]},"
+              + 
"{\"device\":\"root.sg.device2\",\"timestamp\":1,\"measurements\":[\"temp\"],\"values\":[26.5]},"
+              + 
"{\"device\":\"root.sg.device3\",\"timestamp\":1,\"measurements\":[\"temp\"],\"values\":[27.5]}"
+              + "]";
+
+      Awaitility.await()
+          .atMost(3, TimeUnit.MINUTES)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                connection.publish("root.sg", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+                try {
+                  // Check device1
+                  try (final SessionDataSet dataSet1 =
+                      session.executeQueryStatement(
+                          "select temp from root.sg.device1 where time = 1")) {
+                    if (!dataSet1.hasNext()) {
+                      return false;
+                    }
+                    assertEquals(25.5, 
dataSet1.next().getFields().get(0).getDoubleV(), 0.001);
+                  }
+                  // Check device2
+                  try (final SessionDataSet dataSet2 =
+                      session.executeQueryStatement(
+                          "select temp from root.sg.device2 where time = 1")) {
+                    if (!dataSet2.hasNext()) {
+                      return false;
+                    }
+                    assertEquals(26.5, 
dataSet2.next().getFields().get(0).getDoubleV(), 0.001);
+                  }
+                  // Check device3
+                  try (final SessionDataSet dataSet3 =
+                      session.executeQueryStatement(
+                          "select temp from root.sg.device3 where time = 1")) {
+                    if (!dataSet3.hasNext()) {
+                      return false;
+                    }
+                    assertEquals(27.5, 
dataSet3.next().getFields().get(0).getDoubleV(), 0.001);
+                  }
+                  return true;
+                } catch (StatementExecutionException e) {
+                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
+                    return false;
+                  } else {
+                    throw e;
+                  }
+                }
+              });
+    }
+  }
+
+  /** Test batch JSON with different values per timestamp */
+  @Test
+  public void testBatchJsonWithVariousValues() throws Exception {
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      String payload =
+          "{"
+              + "\"device\":\"root.sg.d5\","
+              + "\"timestamps\":[100,200,300,400,500],"
+              + "\"measurements\":[\"temperature\",\"humidity\"],"
+              + 
"\"values\":[[20.1,60.0],[21.2,61.5],[22.3,62.0],[23.4,63.5],[24.5,64.0]]"
+              + "}";
+
+      Awaitility.await()
+          .atMost(3, TimeUnit.MINUTES)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                connection.publish("root.sg.d5", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+                try (final SessionDataSet dataSet =
+                    session.executeQueryStatement("select temperature, 
humidity from root.sg.d5")) {
+                  int count = 0;
+                  while (dataSet.hasNext()) {
+                    RowRecord row = dataSet.next();
+                    List<Field> fields = row.getFields();
+                    assertEquals(2, fields.size());
+                    // Temperature should be between 20 and 25
+                    double temp = fields.get(0).getDoubleV();
+                    assertTrue(temp >= 20.0 && temp <= 25.0);
+                    // Humidity should be between 60 and 65
+                    double humidity = fields.get(1).getDoubleV();
+                    assertTrue(humidity >= 60.0 && humidity <= 65.0);
+                    count++;
+                  }
+                  return count == 5;
+                } catch (StatementExecutionException e) {
+                  if (e.getMessage() != null && e.getMessage().contains("does 
not exist")) {
+                    return false;
+                  } else {
+                    throw e;
+                  }
+                }
+              });
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
index afc202ae68d..16e3c33129f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
@@ -69,6 +69,7 @@ public class IoTDBMQTTServiceIT {
     mqtt.setPassword(PASSWORD);
     mqtt.setConnectAttemptsMax(3);
     mqtt.setReconnectDelay(10);
+    mqtt.setClientId("clientId01");
 
     connection = mqtt.blockingConnection();
     connection.connect();

Reply via email to