This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch f_index_dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/f_index_dev by this push:
     new f606ceb  add zeppelin: multi-queries merge
f606ceb is described below

commit f606ceb2766aefc7399a867b0b543b94be73228b
Author: kr11 <3095717866.com>
AuthorDate: Fri May 14 19:07:37 2021 +0800

    add zeppelin: multi-queries merge
---
 .../apache/iotdb/db/index/common/IndexUtils.java   |   3 +-
 .../apache/zeppelin/iotdb/IoTDBInterpreter.java    | 121 ++++++++++++++++++++-
 .../zeppelin/iotdb/IoTDBInterpreterTest.java       |  35 ++++++
 3 files changed, 155 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java 
b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
index 06aa36d..6befdff 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
@@ -227,8 +227,7 @@ public class IndexUtils {
     return SystemFileFactory.INSTANCE.getFile(filePath);
   }
 
-  private IndexUtils() {
-  }
+  private IndexUtils() {}
 
   public static Map<String, Object> toUpperCaseProps(Map<String, Object> 
props) {
     Map<String, Object> uppercase = new HashMap<>(props.size());
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
index 62a0ac6..6c3cbd2 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.jdbc.IoTDBJDBCResultSet;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.thrift.TException;
@@ -43,11 +44,14 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 
 import static org.apache.iotdb.rpc.IoTDBRpcDataSet.TIMESTAMP_STR;
 import static org.apache.iotdb.rpc.RpcUtils.setTimeFormat;
@@ -197,10 +201,23 @@ public class IoTDBInterpreter extends AbstractInterpreter 
{
     try {
       String[] scriptLines = parseMultiLinesSQL(st);
       InterpreterResult interpreterResult = null;
-
+      List<String> toServerCmds = new ArrayList<>();
       for (String scriptLine : scriptLines) {
         interpreterResult = handleInputCmd(scriptLine, connection);
+        if (interpreterResult == null) {
+          // it need to connect server. we do them together and merge the 
result.
+          toServerCmds.add(scriptLine);
+        }
+      }
+      // do them
+      if (toServerCmds.isEmpty()) {
+        return interpreterResult;
       }
+      if (toServerCmds.size() == 1) {
+        return executeQuery(connection, toServerCmds.get(0));
+      }
+      // multi lines
+      interpreterResult = executeMultiQueryAndMerge(connection, toServerCmds);
       return interpreterResult;
     } catch (StatementExecutionException e) {
       return new InterpreterResult(Code.ERROR, "StatementExecutionException: " 
+ e.getMessage());
@@ -275,7 +292,107 @@ public class IoTDBInterpreter extends AbstractInterpreter 
{
       return new InterpreterResult(
           Code.SUCCESS, "Query timeout has set to " + queryTimeout + " 
seconds");
     }
-    return executeQuery(connection, cmd);
+    //    return executeQuery(connection, cmd);
+    return null;
+  }
+
+  private InterpreterResult executeMultiQueryAndMerge(
+      IoTDBConnection connection, List<String> toServerCmds) {
+    try (Statement statement = connection.createStatement()) {
+      statement.setFetchSize(fetchSize);
+      if (this.queryTimeout > 0) {
+        statement.setQueryTimeout(this.queryTimeout);
+      }
+      // cmdIdx-<column_name, null_string>
+      TreeMap<Integer, Pair<StringBuilder, StringBuilder>> columnsAndNullMap = 
new TreeMap<>();
+      // time-cmdIdx-valueString
+      TreeMap<Long, TreeMap<Integer, StringBuilder>> table = new TreeMap<>();
+
+      for (int cmdIdx = 0; cmdIdx < toServerCmds.size(); cmdIdx++) {
+        String cmd = toServerCmds.get(cmdIdx);
+        boolean hasResultSet = statement.execute(cmd.trim());
+        if (!hasResultSet) {
+          // it's nonQuery, no result.
+          continue;
+        }
+        try (ResultSet resultSet = statement.getResultSet()) {
+          // 对于忽略时间戳的,直接报错,不允许多行
+          boolean printTimestamp =
+              resultSet instanceof IoTDBJDBCResultSet
+                  && !((IoTDBJDBCResultSet) resultSet).isIgnoreTimeStamp();
+          if (!printTimestamp) {
+            return new InterpreterResult(
+                Code.ERROR, "Cannot execute ignore-time query in multi-line 
mode: " + cmd);
+          }
+          // header
+          final ResultSetMetaData metaData = resultSet.getMetaData();
+          final int columnCount = metaData.getColumnCount();
+          StringBuilder columnHeader = new StringBuilder();
+          StringBuilder columnNulls = new StringBuilder();
+          for (int i = 2; i <= columnCount; i++) {
+            columnHeader.append(metaData.getColumnLabel(i).trim()).append(TAB);
+            columnNulls.append(NULL_ITEM).append(TAB);
+          }
+          //          deleteLast(columnHeader);
+          //          deleteLast(columnNulls);
+          columnsAndNullMap.put(cmdIdx, new Pair<>(columnHeader, columnNulls));
+          // value
+          while (resultSet.next()) {
+            long time = resultSet.getLong(TIMESTAMP_STR);
+            StringBuilder stringBuilder = new StringBuilder();
+            for (int i = 2; i <= columnCount; i++) {
+              stringBuilder.append(
+                  
Optional.ofNullable(resultSet.getString(i)).orElse(NULL_ITEM).trim());
+              stringBuilder.append(TAB);
+            }
+            //            deleteLast(stringBuilder);
+            table.computeIfAbsent(time, d -> new TreeMap<>()).put(cmdIdx, 
stringBuilder);
+          }
+        }
+      }
+      if (columnsAndNullMap.isEmpty()) {
+        // no query, all non-query, just return
+        return new InterpreterResult(Code.SUCCESS, "Sql executed.");
+      }
+      String result = treeMapToTableString(columnsAndNullMap, table);
+      InterpreterResult interpreterResult = new 
InterpreterResult(Code.SUCCESS);
+      interpreterResult.add(Type.TABLE, result);
+      return interpreterResult;
+    } catch (SQLException e) {
+      return new InterpreterResult(Code.ERROR, "SQLException: " + 
e.getMessage());
+    }
+  }
+
+  private String treeMapToTableString(
+      TreeMap<Integer, Pair<StringBuilder, StringBuilder>> columnsAndNullMap,
+      TreeMap<Long, TreeMap<Integer, StringBuilder>> table) {
+    StringBuilder res = new StringBuilder();
+    // combine header
+    res.append(TIMESTAMP_STR).append(TAB);
+    columnsAndNullMap.forEach((cmdIdx, headerNullPair) -> 
res.append(headerNullPair.left));
+    deleteLast(res);
+    res.append(NEWLINE);
+    //    Set<Integer> cmdIdxSet = columnsAndNullMap.keySet();
+    table.forEach(
+        (time, cmdIdxValueMap) -> {
+          res.append(
+                  RpcUtils.formatDatetime(
+                      timeFormat, RpcUtils.DEFAULT_TIMESTAMP_PRECISION, time, 
zoneId))
+              .append(TAB);
+          columnsAndNullMap.forEach(
+              (cmdIdx, headerNullPair) -> {
+                if (cmdIdxValueMap.containsKey(cmdIdx)) {
+                  res.append(cmdIdxValueMap.get(cmdIdx));
+                } else {
+                  // append null string
+                  res.append(headerNullPair.right);
+                }
+              });
+          deleteLast(res);
+          res.append(NEWLINE);
+        });
+    deleteLast(res);
+    return res.toString();
   }
 
   private InterpreterResult executeQuery(IoTDBConnection connection, String 
cmd) {
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
index 000b5fc..7330e47 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
@@ -361,6 +361,41 @@ public class IoTDBInterpreterTest {
   }
 
   @Test
+  public void TestMultiQueries() {
+    String insertSQL =
+        "SET STORAGE GROUP TO root.multiquery.wf02.wt01;\n"
+            + "CREATE TIMESERIES root.multiquery.wf02.wt01.status WITH 
DATATYPE=BOOLEAN, ENCODING=PLAIN;\n"
+            + "CREATE TIMESERIES root.multiquery.wf02.wt01.temperature WITH 
DATATYPE=FLOAT, ENCODING=PLAIN;\n"
+            + "CREATE TIMESERIES root.multiquery.wf02.wt01.hardware WITH 
DATATYPE=INT32, ENCODING=PLAIN;\n"
+            + "\n"
+            + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, temperature, 
status, hardware)\n"
+            + "VALUES (1, 1.1, false, 11);\n"
+            + "\n"
+            + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, hardware)\n"
+            + "VALUES (2, 22);\n"
+            + "\n"
+            + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, status, 
hardware)\n"
+            + "VALUES (3, true, 33);\n"
+            + "\n"
+            + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, 
temperature)\n"
+            + "VALUES (4, 4.4);\n"
+            + "\n"
+            + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, temperature, 
status)\n"
+            + "VALUES (5, 5.5, false);\n";
+    System.out.println(insertSQL);
+    InterpreterResult actual = interpreter.internalInterpret(insertSQL, null);
+    System.out.println(actual.message().get(0).getData());
+    String sql1 =
+        "select temperature from root.multiquery.wf02.wt01;\n"
+            + "select status, hardware from root.multiquery.wf02.wt01";
+    System.out.println(sql1);
+    actual = interpreter.internalInterpret(sql1, null);
+    Assert.assertNotNull(actual);
+    //    Assert.assertEquals(Code.SUCCESS, actual.code());
+    System.out.println(actual.message().get(0).getData());
+  }
+
+  @Test
   public void testShowAllTTL() {
     interpreter.internalInterpret("SET TTL TO root.test.wf01 12345", null);
     InterpreterResult actual = interpreter.internalInterpret("SHOW ALL TTL", 
null);

Reply via email to