This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1309eef2b1 [IOTDB-3388] Let MQTT Service could parse JSON array of 
existed format (#6154)
1309eef2b1 is described below

commit 1309eef2b1fb27a8ce7c8911b9c71ee5a9ed7035
Author: 刘威 <[email protected]>
AuthorDate: Sat Jun 18 08:50:37 2022 +0800

    [IOTDB-3388] Let MQTT Service could parse JSON array of existed format 
(#6154)
---
 docs/UserGuide/API/Programming-MQTT.md             |  7 +--
 docs/zh/UserGuide/API/Programming-MQTT.md          |  7 +--
 .../java/org/apache/iotdb/mqtt/MQTTClient.java     |  7 +++
 .../db/protocol/mqtt/JSONPayloadFormatter.java     | 15 +++---
 .../db/protocol/mqtt/JSONPayloadFormatterTest.java | 58 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 14 deletions(-)

diff --git a/docs/UserGuide/API/Programming-MQTT.md 
b/docs/UserGuide/API/Programming-MQTT.md
index a06cb6d3c0..5270dd1025 100644
--- a/docs/UserGuide/API/Programming-MQTT.md
+++ b/docs/UserGuide/API/Programming-MQTT.md
@@ -35,7 +35,7 @@ The Built-in MQTT Service provide the ability of direct 
connection to IoTDB thro
  and then write the data into storage immediately. 
 The MQTT topic corresponds to IoTDB timeseries. 
 The messages payload can be format to events by `PayloadFormatter` which 
loaded by java SPI, and the default implementation is `JSONPayloadFormatter`.
-The default `json` formatter support two json format, and the following is an 
MQTT message payload example:
+The default `json` formatter support two json format and its json array. The 
following is an MQTT message payload example:
 
 ```json
  {
@@ -47,13 +47,14 @@ The default `json` formatter support two json format, and 
the following is an MQ
 ```
 or
 ```json
-{
+ {
       "device":"root.sg.d1",
       "timestamps":[1586076045524,1586076065526],
       "measurements":["s1","s2"],
       "values":[[0.530635,0.530635], [0.530655,0.530695]]
-  }
+ }
 ```
+or json array of the above two.
 
 <img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; 
margin-right:auto; display:block;" 
src="https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png";>
 
diff --git a/docs/zh/UserGuide/API/Programming-MQTT.md 
b/docs/zh/UserGuide/API/Programming-MQTT.md
index 78019546f2..4c832b3684 100644
--- a/docs/zh/UserGuide/API/Programming-MQTT.md
+++ b/docs/zh/UserGuide/API/Programming-MQTT.md
@@ -36,7 +36,7 @@ IoTDB 服务器包括内置的 MQTT 服务,该服务允许远程设备将消
 内置的 MQTT 服务提供了通过 MQTT 直接连接到 IoTDB 的能力。 它侦听来自 MQTT 客户端的发布消息,然后立即将数据写入存储。
 MQTT 主题与 IoTDB 时间序列相对应。
 消息有效载荷可以由 Java SPI 加载的`PayloadFormatter`格式化为事件,默认实现为`JSONPayloadFormatter` 
-   默认的`json`格式化程序支持两种 json 格式,以下是 MQTT 消息有效负载示例:
+   默认的`json`格式化程序支持两种 json 格式以及由他们组成的json数组,以下是 MQTT 消息有效负载示例:
 
 ```json
  {
@@ -48,13 +48,14 @@ MQTT 主题与 IoTDB 时间序列相对应。
 ```
 或者
 ```json
-{
+ {
       "device":"root.sg.d1",
       "timestamps":[1586076045524,1586076065526],
       "measurements":["s1","s2"],
       "values":[[0.530635,0.530635], [0.530655,0.530695]]
-  }
+ }
 ```
+或者以上两者的JSON数组形式。
 
 <img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; 
margin-right:auto; display:block;" 
src="https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png";>
 
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java 
b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
index bd23fde6e8..050316067b 100644
--- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
+++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -36,6 +36,7 @@ public class MQTTClient {
     connection.connect();
 
     Random random = new Random();
+    StringBuilder sb = new StringBuilder();
     for (int i = 0; i < 10; i++) {
       String payload =
           String.format(
@@ -46,10 +47,16 @@ public class MQTTClient {
                   + "\"values\":[%f]\n"
                   + "}",
               System.currentTimeMillis(), random.nextDouble());
+      sb.append(payload).append(",");
 
+      // publish a json object
       Thread.sleep(1);
       connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
     }
+    // publish a json array
+    sb.insert(0, "[");
+    sb.replace(sb.lastIndexOf(","), sb.length(), "]");
+    connection.publish("root.sg.d1.s1", sb.toString().getBytes(), 
QoS.AT_LEAST_ONCE, false);
 
     connection.disconnect();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
 
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
index 3493252501..c464d18e69 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
@@ -24,7 +24,6 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParseException;
-import com.google.gson.JsonSyntaxException;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
 
@@ -53,20 +52,20 @@ public class JSONPayloadFormatter implements 
PayloadFormatter {
       return null;
     }
     String txt = payload.toString(StandardCharsets.UTF_8);
-    try {
-      JsonObject jsonObject = GSON.fromJson(txt, JsonObject.class);
-
+    JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
+    if (jsonElement.isJsonObject()) {
+      JsonObject jsonObject = jsonElement.getAsJsonObject();
       if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) {
         return formatJson(jsonObject);
       }
       if (jsonObject.get(JSON_KEY_TIMESTAMPS) != null) {
         return formatBatchJson(jsonObject);
       }
-    } catch (JsonSyntaxException e) {
-      JsonArray jsonArray = GSON.fromJson(txt, JsonArray.class);
+    } else if (jsonElement.isJsonArray()) {
+      JsonArray jsonArray = jsonElement.getAsJsonArray();
       List<Message> messages = new ArrayList<>();
-      for (JsonElement jsonElement : jsonArray) {
-        JsonObject jsonObject = jsonElement.getAsJsonObject();
+      for (JsonElement element : jsonArray) {
+        JsonObject jsonObject = element.getAsJsonObject();
         if (jsonObject.get(JSON_KEY_TIMESTAMP) != null) {
           messages.addAll(formatJson(jsonObject));
         }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
 
b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
index 0b16adc26c..082225984c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
@@ -68,4 +68,62 @@ public class JSONPayloadFormatterTest {
     assertEquals("s2", message.getMeasurements().get(1));
     assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
   }
+
+  @Test
+  public void formatJsonArray() {
+    String payload =
+        " [\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d1\",\n"
+            + "      \"timestamp\":1586076045524,\n"
+            + "      \"measurements\":[\"s1\",\"s2\"],\n"
+            + "      \"values\":[0.530635,0.530635]\n"
+            + "  },\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d2\",\n"
+            + "      \"timestamp\":1586076065526,\n"
+            + "      \"measurements\":[\"s3\",\"s4\"],\n"
+            + "      \"values\":[0.530655,0.530655]\n"
+            + "  }\n"
+            + "]";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+    Message message = formatter.format(buf).get(1);
+
+    assertEquals("root.sg.d2", message.getDevice());
+    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
+    assertEquals("s3", message.getMeasurements().get(0));
+    assertEquals(0.530655D, Double.parseDouble(message.getValues().get(0)), 0);
+  }
+
+  @Test
+  public void formatBatchJsonArray() {
+    String payload =
+        "[\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d1\",\n"
+            + "      \"timestamps\":[1586076045524,1586076065526],\n"
+            + "      \"measurements\":[\"s1\",\"s2\"],\n"
+            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
+            + "  },\n"
+            + "  {\n"
+            + "      \"device\":\"root.sg.d2\",\n"
+            + "      \"timestamps\":[1586076045524,1586076065526],\n"
+            + "      \"measurements\":[\"s3\",\"s4\"],\n"
+            + "      \"values\":[[0.530635,0.530635], [0.530655,0.530695]]\n"
+            + "  }"
+            + "]";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+    JSONPayloadFormatter formatter = new JSONPayloadFormatter();
+    Message message = formatter.format(buf).get(3);
+
+    assertEquals("root.sg.d2", message.getDevice());
+    assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
+    assertEquals("s4", message.getMeasurements().get(1));
+    assertEquals(0.530695D, Double.parseDouble(message.getValues().get(1)), 0);
+  }
 }

Reply via email to