This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7e2f27c Switch Zeppelin connector from Session to JDBC (#2414)
7e2f27c is described below
commit 7e2f27c7dc81a4faa9bca4a8147e6cfe35a21746
Author: Rong-Kang <[email protected]>
AuthorDate: Thu Jan 7 12:04:24 2021 +0800
Switch Zeppelin connector from Session to JDBC (#2414)
---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 2 -
zeppelin-interpreter/pom.xml | 2 +-
.../apache/zeppelin/iotdb/IoTDBInterpreter.java | 200 +++++++++++++--------
.../zeppelin/iotdb/IoTDBInterpreterTest.java | 9 +-
4 files changed, 133 insertions(+), 80 deletions(-)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index c9be448..a068d98 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -189,8 +189,6 @@ public class RpcUtils {
switch (newTimeFormat.trim().toLowerCase()) {
case "long":
case "number":
- timeFormat = newTimeFormat.trim().toLowerCase();
- break;
case DEFAULT_TIME_FORMAT:
case "iso8601":
timeFormat = newTimeFormat.trim().toLowerCase();
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 222a95e..0376060 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -54,7 +54,7 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-session</artifactId>
+ <artifactId>iotdb-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
index 7574d5c..dc1c86c 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
@@ -17,20 +17,26 @@
package org.apache.zeppelin.iotdb;
+import static org.apache.iotdb.rpc.IoTDBRpcDataSet.TIMESTAMP_STR;
import static org.apache.iotdb.rpc.RpcUtils.setTimeFormat;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.time.ZoneId;
import java.util.Arrays;
-import java.util.List;
+import java.util.HashSet;
import java.util.Properties;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBConnection;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -52,14 +58,13 @@ public class IoTDBInterpreter extends AbstractInterpreter {
static final String IOTDB_ZONE_ID = "iotdb.zoneId";
static final String IOTDB_ENABLE_RPC_COMPRESSION =
"iotdb.enable.rpc.compression";
static final String IOTDB_TIME_DISPLAY_TYPE = "iotdb.time.display.type";
- static final String SET_TIMESTAMP_DISPLAY = "set time_display_type";
-
private static final String NONE_VALUE = "none";
static final String DEFAULT_HOST = "127.0.0.1";
static final String DEFAULT_PORT = "6667";
static final String DEFAULT_FETCH_SIZE = "10000";
static final String DEFAULT_ENABLE_RPC_COMPRESSION = "false";
static final String DEFAULT_TIME_DISPLAY_TYPE = "long";
+ static final String DEFAULT_ZONE_ID = "UTC";
private static final char TAB = '\t';
private static final char NEWLINE = '\n';
@@ -67,48 +72,84 @@ public class IoTDBInterpreter extends AbstractInterpreter {
private static final String SEMICOLON = ";";
private static final String EQUAL_SIGN = "=";
- private IoTDBConnectionException connectionException;
- private Session session;
+
+ /**
+ * should be consistent with IoTDB client
+ */
+ private static final String QUIT_COMMAND = "quit";
+ private static final String EXIT_COMMAND = "exit";
+ private static final String HELP = "help";
+ private static final String IMPORT_CMD = "import";
+ static final String SET_TIMESTAMP_DISPLAY = "set time_display_type";
+ private static final String SET_MAX_DISPLAY_NUM = "set max_display_num";
+ private static final String SHOW_TIMESTAMP_DISPLAY = "show
time_display_type";
+ private static final String SET_TIME_ZONE = "set time_zone";
+ private static final String SHOW_TIMEZONE = "show time_zone";
+ private static final String SET_FETCH_SIZE = "set fetch_size";
+ private static final String SHOW_FETCH_SIZE = "show fetch_size";
+
+ private static final Set<String> nonSupportCommandSet = new HashSet<>(
+ Arrays.asList(QUIT_COMMAND, EXIT_COMMAND, HELP, IMPORT_CMD,
SET_TIME_ZONE, SET_FETCH_SIZE,
+ SET_MAX_DISPLAY_NUM, SHOW_TIMEZONE, SHOW_TIMESTAMP_DISPLAY,
SHOW_FETCH_SIZE, IMPORT_CMD));
+
private String timeFormat;
+
+ private IoTDBConnectionException connectionException;
+ private IoTDBConnection connection = null;
+ private int fetchSize;
private ZoneId zoneId;
+
public IoTDBInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
+ String host;
+ int port;
+ String passWord;
try {
- String host = getProperty(IOTDB_HOST, DEFAULT_HOST).trim();
- int port = Integer.parseInt(getProperty(IOTDB_PORT,
DEFAULT_PORT).trim());
- String userName = properties.getProperty(IOTDB_USERNAME,
NONE_VALUE).trim();
- String passWord = properties.getProperty(IOTDB_PASSWORD,
NONE_VALUE).trim();
- int fetchSize = Integer
+ host = getProperty(IOTDB_HOST, DEFAULT_HOST).trim();
+ port = Integer.parseInt(getProperty(IOTDB_PORT, DEFAULT_PORT).trim());
+ userName = properties.getProperty(IOTDB_USERNAME, NONE_VALUE).trim();
+ passWord = properties.getProperty(IOTDB_PASSWORD, NONE_VALUE).trim();
+ this.fetchSize = Integer
.parseInt(properties.getProperty(IOTDB_FETCH_SIZE,
DEFAULT_FETCH_SIZE).trim());
- String zoneStr = properties.getProperty(IOTDB_ZONE_ID);
- this.zoneId = !NONE_VALUE.equalsIgnoreCase(zoneStr) &&
StringUtils.isNotBlank(zoneStr)
- ? ZoneId.of(zoneStr.trim()) : ZoneId.systemDefault();
+
String timeDisplayType = properties.getProperty(IOTDB_TIME_DISPLAY_TYPE,
DEFAULT_TIME_DISPLAY_TYPE).trim();
this.timeFormat = setTimeFormat(timeDisplayType);
- boolean enableRPCCompression = "true".equalsIgnoreCase(
+ Config.rpcThriftCompressionEnable = "true".equalsIgnoreCase(
properties.getProperty(IOTDB_ENABLE_RPC_COMPRESSION,
DEFAULT_ENABLE_RPC_COMPRESSION).trim());
- session = new Session(host, port, userName, passWord, fetchSize, zoneId);
- session.open(enableRPCCompression);
- } catch (IoTDBConnectionException e) {
- connectionException = e;
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ this.connection = (IoTDBConnection) DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/",
userName, passWord);
+ String zoneStr = properties.getProperty(IOTDB_ZONE_ID);
+ if (!NONE_VALUE.equalsIgnoreCase(zoneStr) &&
StringUtils.isNotBlank(zoneStr)) {
+ this.zoneId = ZoneId.of(zoneStr.trim());
+ connection.setTimeZone(zoneStr);
+ } else {
+ this.zoneId = ZoneId.systemDefault();
+ connection.setTimeZone(this.zoneId.getId());
+ }
+ connection.setTimeZone(this.zoneId.getId());
+
+ } catch (SQLException | ClassNotFoundException | TException e) {
+ connectionException = new IoTDBConnectionException(e);
}
}
@Override
public void close() {
try {
- if (session != null) {
- session.close();
+ if (this.connection != null) {
+ this.connection.close();
}
- } catch (IoTDBConnectionException e) {
- connectionException = e;
+ } catch (SQLException e) {
+ connectionException = new IoTDBConnectionException(e);
}
}
@@ -131,62 +172,75 @@ public class IoTDBInterpreter extends AbstractInterpreter
{
try {
String[] scriptLines = parseMultiLinesSQL(st);
InterpreterResult interpreterResult = null;
+
for (String scriptLine : scriptLines) {
- String lowercaseSc = scriptLine.toLowerCase();
- if (lowercaseSc.startsWith(SET_TIMESTAMP_DISPLAY)) {
- String[] values = scriptLine.split(EQUAL_SIGN);
- if (values.length != 2) {
- throw new StatementExecutionException(
- String.format("Time display format error, please input like
%s=ISO8601",
- SET_TIMESTAMP_DISPLAY));
- }
- String timeDisplayType = values[1].trim();
- this.timeFormat = setTimeFormat(values[1]);
- interpreterResult = new InterpreterResult(Code.SUCCESS, "Time
display type has set to " +
- timeDisplayType);
- } else if (lowercaseSc.startsWith("select")) {
- //Execute query
- String msg;
- msg = getResultWithCols(session, scriptLine);
- interpreterResult = new InterpreterResult(Code.SUCCESS);
- interpreterResult.add(Type.TABLE, msg);
- } else {
- //Execute non query
- session.executeNonQueryStatement(scriptLine);
- interpreterResult = new InterpreterResult(Code.SUCCESS, "Sql
executed.");
- }
+ interpreterResult = handleInputCmd(scriptLine, connection);
}
return interpreterResult;
} catch (StatementExecutionException e) {
return new InterpreterResult(Code.ERROR, "StatementExecutionException: "
+ e.getMessage());
- } catch (IoTDBConnectionException e) {
- return new InterpreterResult(Code.ERROR, "IoTDBConnectionException: " +
e.getMessage());
}
}
- private String getResultWithCols(Session session, String sql)
- throws StatementExecutionException, IoTDBConnectionException {
- SessionDataSet sessionDataSet = session.executeQueryStatement(sql);
- List<String> columnNames = sessionDataSet.getColumnNames();
- StringBuilder stringBuilder = new StringBuilder();
- for (String key : columnNames) {
- stringBuilder.append(key);
- stringBuilder.append(TAB);
+ private InterpreterResult handleInputCmd(String cmd, IoTDBConnection
connection)
+ throws StatementExecutionException {
+ String specialCmd = cmd.toLowerCase().trim();
+ if (nonSupportCommandSet.contains(specialCmd)) {
+ return new InterpreterResult(Code.ERROR, "Not supported in Zeppelin: " +
specialCmd);
+ }
+ if (specialCmd.startsWith(SET_TIMESTAMP_DISPLAY)) {
+ String[] values = cmd.split(EQUAL_SIGN);
+ if (values.length != 2) {
+ throw new StatementExecutionException(
+ String.format("Time display format error, please input like
%s=ISO8601",
+ SET_TIMESTAMP_DISPLAY));
+ }
+ String timeDisplayType = values[1].trim();
+ this.timeFormat = setTimeFormat(values[1]);
+ return new InterpreterResult(Code.SUCCESS, "Time display type has set to
" +
+ timeDisplayType);
}
- stringBuilder.deleteCharAt(stringBuilder.length() - 1);
- stringBuilder.append(NEWLINE);
- while (sessionDataSet.hasNext()) {
- RowRecord record = sessionDataSet.next();
- stringBuilder.append(RpcUtils.formatDatetime(timeFormat,
RpcUtils.DEFAULT_TIMESTAMP_PRECISION,
- record.getTimestamp(), zoneId));
- for (Field f : record.getFields()) {
- stringBuilder.append(TAB);
- stringBuilder.append(f);
+ return executeQuery(connection, cmd);
+ }
+
+ private InterpreterResult executeQuery(IoTDBConnection connection, String
cmd) {
+ StringBuilder stringBuilder = new StringBuilder();
+ try (Statement statement = connection.createStatement()) {
+ statement.setFetchSize(fetchSize);
+ boolean hasResultSet = statement.execute(cmd.trim());
+ InterpreterResult interpreterResult;
+ if (hasResultSet) {
+ try (ResultSet resultSet = statement.getResultSet()) {
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ final int columnCount = metaData.getColumnCount();
+
+ for (int i = 1; i <= columnCount; i++) {
+ stringBuilder.append(metaData.getColumnLabel(i));
+ stringBuilder.append(TAB);
+ }
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ stringBuilder.append(NEWLINE);
+ while (resultSet.next()) {
+ stringBuilder
+ .append(RpcUtils.formatDatetime(timeFormat,
RpcUtils.DEFAULT_TIMESTAMP_PRECISION,
+ resultSet.getLong(TIMESTAMP_STR), zoneId));
+ for (int i = 2; i <= columnCount; i++) {
+ stringBuilder.append(TAB);
+ stringBuilder.append(resultSet.getString(i));
+ }
+ stringBuilder.append(NEWLINE);
+ }
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ interpreterResult = new InterpreterResult(Code.SUCCESS);
+ interpreterResult.add(Type.TABLE, stringBuilder.toString());
+ return interpreterResult;
+ }
+ } else {
+ return new InterpreterResult(Code.SUCCESS, "Sql executed.");
}
- stringBuilder.append(NEWLINE);
+ } catch (SQLException e) {
+ return new InterpreterResult(Code.ERROR, "SQLException: " +
e.getMessage());
}
- stringBuilder.deleteCharAt(stringBuilder.length() - 1);
- return stringBuilder.toString();
}
@Override
@@ -197,8 +251,8 @@ public class IoTDBInterpreter extends AbstractInterpreter {
@Override
public void cancel(InterpreterContext context) {
try {
- session.close();
- } catch (IoTDBConnectionException e) {
+ connection.close();
+ } catch (SQLException e) {
LOGGER.error("Exception close failed", e);
}
}
diff --git
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
index 2013c47..94b836f 100644
---
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
+++
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
@@ -24,6 +24,7 @@ import static
org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_FETCH_SIZE;
import static org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_HOST;
import static org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_PORT;
import static
org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_TIME_DISPLAY_TYPE;
+import static org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_ZONE_ID;
import static
org.apache.zeppelin.iotdb.IoTDBInterpreter.IOTDB_ENABLE_RPC_COMPRESSION;
import static org.apache.zeppelin.iotdb.IoTDBInterpreter.IOTDB_FETCH_SIZE;
import static org.apache.zeppelin.iotdb.IoTDBInterpreter.IOTDB_HOST;
@@ -59,7 +60,7 @@ public class IoTDBInterpreterTest {
properties.put(IOTDB_USERNAME, "root");
properties.put(IOTDB_PASSWORD, "root");
properties.put(IOTDB_FETCH_SIZE, DEFAULT_FETCH_SIZE);
- properties.put(IOTDB_ZONE_ID, "UTC");
+ properties.put(IOTDB_ZONE_ID, DEFAULT_ZONE_ID);
properties.put(IOTDB_ENABLE_RPC_COMPRESSION,
DEFAULT_ENABLE_RPC_COMPRESSION);
properties.put(IOTDB_TIME_DISPLAY_TYPE, DEFAULT_TIME_DISPLAY_TYPE);
interpreter = new IoTDBInterpreter(properties);
@@ -193,7 +194,7 @@ public class IoTDBInterpreterTest {
Assert.assertNotNull(actual);
Assert.assertEquals(Code.ERROR, actual.code());
Assert.assertEquals(
- "StatementExecutionException: 401: meet error while parsing SQL to
physical plan: {}line 1:13 missing ROOT at '<EOF>'",
+ "SQLException: 401: line 1:13 missing ROOT at '<EOF>'",
actual.message().get(0).getData());
wrongSql = "select * from a";
@@ -201,14 +202,14 @@ public class IoTDBInterpreterTest {
Assert.assertNotNull(actual);
Assert.assertEquals(Code.ERROR, actual.code());
Assert.assertEquals(
- "StatementExecutionException: 401: meet error while parsing SQL to
physical plan: {}line 1:14 mismatched input 'a' expecting {FROM, ',', '.'}",
+ "SQLException: 401: line 1:14 mismatched input 'a' expecting {FROM,
',', '.'}",
actual.message().get(0).getData());
wrongSql = "select * from root a";
Assert.assertNotNull(actual);
Assert.assertEquals(Code.ERROR, actual.code());
Assert.assertEquals(
- "StatementExecutionException: 401: meet error while parsing SQL to
physical plan: {}line 1:14 mismatched input 'a' expecting {FROM, ',', '.'}",
+ "SQLException: 401: line 1:14 mismatched input 'a' expecting {FROM,
',', '.'}",
actual.message().get(0).getData());
}