This is an automated email from the ASF dual-hosted git repository.
kangrong pushed a commit to branch f_index_dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/f_index_dev by this push:
new f606ceb add zeppelin: multi-queries merge
f606ceb is described below
commit f606ceb2766aefc7399a867b0b543b94be73228b
Author: kr11 <3095717866.com>
AuthorDate: Fri May 14 19:07:37 2021 +0800
add zeppelin: multi-queries merge
---
.../apache/iotdb/db/index/common/IndexUtils.java | 3 +-
.../apache/zeppelin/iotdb/IoTDBInterpreter.java | 121 ++++++++++++++++++++-
.../zeppelin/iotdb/IoTDBInterpreterTest.java | 35 ++++++
3 files changed, 155 insertions(+), 4 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
index 06aa36d..6befdff 100644
--- a/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/index/common/IndexUtils.java
@@ -227,8 +227,7 @@ public class IndexUtils {
return SystemFileFactory.INSTANCE.getFile(filePath);
}
- private IndexUtils() {
- }
+ private IndexUtils() {}
public static Map<String, Object> toUpperCaseProps(Map<String, Object>
props) {
Map<String, Object> uppercase = new HashMap<>(props.size());
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
index 62a0ac6..6c3cbd2 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.jdbc.IoTDBJDBCResultSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
@@ -43,11 +44,14 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeMap;
import static org.apache.iotdb.rpc.IoTDBRpcDataSet.TIMESTAMP_STR;
import static org.apache.iotdb.rpc.RpcUtils.setTimeFormat;
@@ -197,10 +201,23 @@ public class IoTDBInterpreter extends AbstractInterpreter
{
try {
String[] scriptLines = parseMultiLinesSQL(st);
InterpreterResult interpreterResult = null;
-
+ List<String> toServerCmds = new ArrayList<>();
for (String scriptLine : scriptLines) {
interpreterResult = handleInputCmd(scriptLine, connection);
+ if (interpreterResult == null) {
+ // it need to connect server. we do them together and merge the
result.
+ toServerCmds.add(scriptLine);
+ }
+ }
+ // do them
+ if (toServerCmds.isEmpty()) {
+ return interpreterResult;
}
+ if (toServerCmds.size() == 1) {
+ return executeQuery(connection, toServerCmds.get(0));
+ }
+ // multi lines
+ interpreterResult = executeMultiQueryAndMerge(connection, toServerCmds);
return interpreterResult;
} catch (StatementExecutionException e) {
return new InterpreterResult(Code.ERROR, "StatementExecutionException: "
+ e.getMessage());
@@ -275,7 +292,107 @@ public class IoTDBInterpreter extends AbstractInterpreter
{
return new InterpreterResult(
Code.SUCCESS, "Query timeout has set to " + queryTimeout + "
seconds");
}
- return executeQuery(connection, cmd);
+ // return executeQuery(connection, cmd);
+ return null;
+ }
+
+ private InterpreterResult executeMultiQueryAndMerge(
+ IoTDBConnection connection, List<String> toServerCmds) {
+ try (Statement statement = connection.createStatement()) {
+ statement.setFetchSize(fetchSize);
+ if (this.queryTimeout > 0) {
+ statement.setQueryTimeout(this.queryTimeout);
+ }
+ // cmdIdx-<column_name, null_string>
+ TreeMap<Integer, Pair<StringBuilder, StringBuilder>> columnsAndNullMap =
new TreeMap<>();
+ // time-cmdIdx-valueString
+ TreeMap<Long, TreeMap<Integer, StringBuilder>> table = new TreeMap<>();
+
+ for (int cmdIdx = 0; cmdIdx < toServerCmds.size(); cmdIdx++) {
+ String cmd = toServerCmds.get(cmdIdx);
+ boolean hasResultSet = statement.execute(cmd.trim());
+ if (!hasResultSet) {
+ // it's nonQuery, no result.
+ continue;
+ }
+ try (ResultSet resultSet = statement.getResultSet()) {
+ // 对于忽略时间戳的,直接报错,不允许多行
+ boolean printTimestamp =
+ resultSet instanceof IoTDBJDBCResultSet
+ && !((IoTDBJDBCResultSet) resultSet).isIgnoreTimeStamp();
+ if (!printTimestamp) {
+ return new InterpreterResult(
+ Code.ERROR, "Cannot execute ignore-time query in multi-line
mode: " + cmd);
+ }
+ // header
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ final int columnCount = metaData.getColumnCount();
+ StringBuilder columnHeader = new StringBuilder();
+ StringBuilder columnNulls = new StringBuilder();
+ for (int i = 2; i <= columnCount; i++) {
+ columnHeader.append(metaData.getColumnLabel(i).trim()).append(TAB);
+ columnNulls.append(NULL_ITEM).append(TAB);
+ }
+ // deleteLast(columnHeader);
+ // deleteLast(columnNulls);
+ columnsAndNullMap.put(cmdIdx, new Pair<>(columnHeader, columnNulls));
+ // value
+ while (resultSet.next()) {
+ long time = resultSet.getLong(TIMESTAMP_STR);
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 2; i <= columnCount; i++) {
+ stringBuilder.append(
+
Optional.ofNullable(resultSet.getString(i)).orElse(NULL_ITEM).trim());
+ stringBuilder.append(TAB);
+ }
+ // deleteLast(stringBuilder);
+ table.computeIfAbsent(time, d -> new TreeMap<>()).put(cmdIdx,
stringBuilder);
+ }
+ }
+ }
+ if (columnsAndNullMap.isEmpty()) {
+ // no query, all non-query, just return
+ return new InterpreterResult(Code.SUCCESS, "Sql executed.");
+ }
+ String result = treeMapToTableString(columnsAndNullMap, table);
+ InterpreterResult interpreterResult = new
InterpreterResult(Code.SUCCESS);
+ interpreterResult.add(Type.TABLE, result);
+ return interpreterResult;
+ } catch (SQLException e) {
+ return new InterpreterResult(Code.ERROR, "SQLException: " +
e.getMessage());
+ }
+ }
+
+ private String treeMapToTableString(
+ TreeMap<Integer, Pair<StringBuilder, StringBuilder>> columnsAndNullMap,
+ TreeMap<Long, TreeMap<Integer, StringBuilder>> table) {
+ StringBuilder res = new StringBuilder();
+ // combine header
+ res.append(TIMESTAMP_STR).append(TAB);
+ columnsAndNullMap.forEach((cmdIdx, headerNullPair) ->
res.append(headerNullPair.left));
+ deleteLast(res);
+ res.append(NEWLINE);
+ // Set<Integer> cmdIdxSet = columnsAndNullMap.keySet();
+ table.forEach(
+ (time, cmdIdxValueMap) -> {
+ res.append(
+ RpcUtils.formatDatetime(
+ timeFormat, RpcUtils.DEFAULT_TIMESTAMP_PRECISION, time,
zoneId))
+ .append(TAB);
+ columnsAndNullMap.forEach(
+ (cmdIdx, headerNullPair) -> {
+ if (cmdIdxValueMap.containsKey(cmdIdx)) {
+ res.append(cmdIdxValueMap.get(cmdIdx));
+ } else {
+ // append null string
+ res.append(headerNullPair.right);
+ }
+ });
+ deleteLast(res);
+ res.append(NEWLINE);
+ });
+ deleteLast(res);
+ return res.toString();
}
private InterpreterResult executeQuery(IoTDBConnection connection, String
cmd) {
diff --git
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
index 000b5fc..7330e47 100644
---
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
+++
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
@@ -361,6 +361,41 @@ public class IoTDBInterpreterTest {
}
@Test
+ public void TestMultiQueries() {
+ String insertSQL =
+ "SET STORAGE GROUP TO root.multiquery.wf02.wt01;\n"
+ + "CREATE TIMESERIES root.multiquery.wf02.wt01.status WITH
DATATYPE=BOOLEAN, ENCODING=PLAIN;\n"
+ + "CREATE TIMESERIES root.multiquery.wf02.wt01.temperature WITH
DATATYPE=FLOAT, ENCODING=PLAIN;\n"
+ + "CREATE TIMESERIES root.multiquery.wf02.wt01.hardware WITH
DATATYPE=INT32, ENCODING=PLAIN;\n"
+ + "\n"
+ + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, temperature,
status, hardware)\n"
+ + "VALUES (1, 1.1, false, 11);\n"
+ + "\n"
+ + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, hardware)\n"
+ + "VALUES (2, 22);\n"
+ + "\n"
+ + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, status,
hardware)\n"
+ + "VALUES (3, true, 33);\n"
+ + "\n"
+ + "INSERT INTO root.multiquery.wf02.wt01 (timestamp,
temperature)\n"
+ + "VALUES (4, 4.4);\n"
+ + "\n"
+ + "INSERT INTO root.multiquery.wf02.wt01 (timestamp, temperature,
status)\n"
+ + "VALUES (5, 5.5, false);\n";
+ System.out.println(insertSQL);
+ InterpreterResult actual = interpreter.internalInterpret(insertSQL, null);
+ System.out.println(actual.message().get(0).getData());
+ String sql1 =
+ "select temperature from root.multiquery.wf02.wt01;\n"
+ + "select status, hardware from root.multiquery.wf02.wt01";
+ System.out.println(sql1);
+ actual = interpreter.internalInterpret(sql1, null);
+ Assert.assertNotNull(actual);
+ // Assert.assertEquals(Code.SUCCESS, actual.code());
+ System.out.println(actual.message().get(0).getData());
+ }
+
+ @Test
public void testShowAllTTL() {
interpreter.internalInterpret("SET TTL TO root.test.wf01 12345", null);
InterpreterResult actual = interpreter.internalInterpret("SHOW ALL TTL",
null);