This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d2f98b [IOTDB-241] Add query and non query interface in session
(#424)
6d2f98b is described below
commit 6d2f98bfb6fb2634ee31f5801443f3cbb93c2bec
Author: SilverNarcissus <[email protected]>
AuthorDate: Tue Oct 8 09:56:13 2019 +0800
[IOTDB-241] Add query and non query interface in session (#424)
* Add sql interface in session
---
.../main/java/org/apache/iotdb/SessionExample.java | 23 +++-
.../java/org/apache/iotdb/session/Session.java | 118 ++++++++++++++++----
.../org/apache/iotdb/session/SessionDataSet.java | 120 +++++++++++++++++++++
.../org/apache/iotdb/session/SessionUtils.java | 70 ++++++++++++
.../org/apache/iotdb/session/IoTDBSessionIT.java | 85 +++++++++++----
5 files changed, 375 insertions(+), 41 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 1982bfd..8e41dbd 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -18,22 +18,27 @@
*/
package org.apache.iotdb;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.session.IoTDBSessionException;
import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.thrift.TException;
public class SessionExample {
private static Session session;
- public static void main(String[] args) throws IoTDBSessionException {
+ public static void main(String[] args)
+ throws IoTDBSessionException, TException, IoTDBRPCException,
SQLException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
@@ -44,6 +49,8 @@ public class SessionExample {
insert();
insertRowBatch();
+ nonQuery();
+ query();
deleteData();
deleteTimeseries();
session.close();
@@ -107,4 +114,18 @@ public class SessionExample {
paths.add("root.sg1.d1.s3");
session.deleteTimeseries(paths);
}
+
+ private static void query() throws TException, IoTDBRPCException,
SQLException {
+ SessionDataSet dataSet = session.executeQueryStatement("select * from
root.sg1.d1");
+ dataSet.setBatchSize(1024); // default is 512
+ while (dataSet.hasNext()){
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
+ private static void nonQuery() throws TException, IoTDBRPCException,
SQLException {
+ session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s0)
values(200, 1);");
+ }
}
\ No newline at end of file
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 0ad5508..4817c67 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,15 +18,35 @@
*/
package org.apache.iotdb.session;
+import java.sql.SQLException;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.TSBatchInsertionReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
@@ -34,24 +54,26 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
-
-import java.time.ZoneId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Session {
private static final Logger logger = LoggerFactory.getLogger(Session.class);
+ private final TSProtocolVersion protocolVersion =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1;
private String host;
private int port;
private String username;
private String password;
- private final TSProtocolVersion protocolVersion =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1;
private TSIService.Iface client = null;
private TS_SessionHandle sessionHandle = null;
private TSocket transport;
private boolean isClosed = true;
private ZoneId zoneId;
+ private RowRecord record;
+ private AtomicLong queryId = new AtomicLong(0);
+ private TSOperationHandle operationHandle;
+
public Session(String host, int port) {
this(host, port, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -86,10 +108,9 @@ public class Session {
}
}
- if(enableRPCCompression) {
+ if (enableRPCCompression) {
client = new TSIService.Client(new TCompactProtocol(transport));
- }
- else {
+ } else {
client = new TSIService.Client(new TBinaryProtocol(transport));
}
@@ -135,7 +156,8 @@ public class Session {
try {
client.closeSession(req);
} catch (TException e) {
- throw new IoTDBSessionException("Error occurs when closing session at
server. Maybe server is down.", e);
+ throw new IoTDBSessionException(
+ "Error occurs when closing session at server. Maybe server is
down.", e);
} finally {
isClosed = true;
if (transport != null) {
@@ -144,10 +166,11 @@ public class Session {
}
}
- public synchronized TSExecuteBatchStatementResp insertBatch(RowBatch
rowBatch) throws IoTDBSessionException {
+ public synchronized TSExecuteBatchStatementResp insertBatch(RowBatch
rowBatch)
+ throws IoTDBSessionException {
TSBatchInsertionReq request = new TSBatchInsertionReq();
request.deviceId = rowBatch.deviceId;
- for (MeasurementSchema measurementSchema: rowBatch.measurements) {
+ for (MeasurementSchema measurementSchema : rowBatch.measurements) {
request.addToMeasurements(measurementSchema.getMeasurementId());
request.addToTypes(measurementSchema.getType().ordinal());
}
@@ -162,7 +185,8 @@ public class Session {
}
}
- public synchronized TSStatus insert(String deviceId, long time, List<String>
measurements, List<String> values)
+ public synchronized TSStatus insert(String deviceId, long time, List<String>
measurements,
+ List<String> values)
throws IoTDBSessionException {
TSInsertReq request = new TSInsertReq();
request.setDeviceId(deviceId);
@@ -179,6 +203,7 @@ public class Session {
/**
* delete a timeseries, including data and schema
+ *
* @param path timeseries to delete, should be a whole path
*/
public synchronized TSStatus deleteTimeseries(String path) throws
IoTDBSessionException {
@@ -189,6 +214,7 @@ public class Session {
/**
* delete a timeseries, including data and schema
+ *
* @param paths timeseries to delete, should be a whole path
*/
public synchronized TSStatus deleteTimeseries(List<String> paths) throws
IoTDBSessionException {
@@ -201,6 +227,7 @@ public class Session {
/**
* delete data <= time in one timeseries
+ *
* @param path data in which time series to delete
* @param time data with time stamp less than or equal to time will be
deleted
*/
@@ -212,10 +239,12 @@ public class Session {
/**
* delete data <= time in multiple timeseries
+ *
* @param paths data in which time series to delete
* @param time data with time stamp less than or equal to time will be
deleted
*/
- public synchronized TSStatus deleteData(List<String> paths, long time)
throws IoTDBSessionException {
+ public synchronized TSStatus deleteData(List<String> paths, long time)
+ throws IoTDBSessionException {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setPaths(paths);
request.setTimestamp(time);
@@ -235,21 +264,25 @@ public class Session {
}
}
- public synchronized TSStatus deleteStorageGroup(String storageGroup) throws
IoTDBSessionException {
+
+ public synchronized TSStatus deleteStorageGroup(String storageGroup)
+ throws IoTDBSessionException {
List<String> groups = new ArrayList<>();
groups.add(storageGroup);
return deleteStorageGroups(groups);
}
- public synchronized TSStatus deleteStorageGroups(List<String> storageGroup)
throws IoTDBSessionException {
+ public synchronized TSStatus deleteStorageGroups(List<String> storageGroup)
+ throws IoTDBSessionException {
try {
- return checkAndReturn(client.deleteStorageGroups(storageGroup));
+ return checkAndReturn(client.deleteStorageGroups(storageGroup));
} catch (TException e) {
- throw new IoTDBSessionException(e);
+ throw new IoTDBSessionException(e);
}
}
- public synchronized TSStatus createTimeseries(String path, TSDataType
dataType, TSEncoding encoding, CompressionType compressor) throws
IoTDBSessionException {
+ public synchronized TSStatus createTimeseries(String path, TSDataType
dataType,
+ TSEncoding encoding, CompressionType compressor) throws
IoTDBSessionException {
TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
request.setPath(path);
request.setDataType(dataType.ordinal());
@@ -294,4 +327,53 @@ public class Session {
this.zoneId = ZoneId.of(zoneId);
}
+ /**
+ * check whether this sql is for query
+ *
+ * @param sql sql
+ * @return whether this sql is for query
+ */
+ private boolean checkIsQuery(String sql) {
+ sql = sql.trim().toLowerCase();
+ return sql.startsWith("select") || sql.startsWith("show") ||
sql.startsWith("list");
+ }
+
+ /**
+ * execure query sql
+ *
+ * @param sql query statement
+ * @return result set
+ */
+ public SessionDataSet executeQueryStatement(String sql)
+ throws TException, IoTDBRPCException, SQLException {
+ if (!checkIsQuery(sql)) {
+ throw new IllegalArgumentException("your sql \"" + sql
+ + "\" is not a query statement, you should use
executeNonQueryStatement method instead.");
+ }
+
+ TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle,
sql);
+ TSExecuteStatementResp execResp = client.executeStatement(execReq);
+
+ RpcUtils.verifySuccess(execResp.getStatus());
+ operationHandle = execResp.getOperationHandle();
+ return new SessionDataSet(sql, queryId.incrementAndGet(), client,
operationHandle);
+ }
+
+ /**
+ * execute non query statement
+ *
+ * @param sql non query statement
+ */
+ public void executeNonQueryStatement(String sql) throws TException,
IoTDBRPCException {
+ if (checkIsQuery(sql)) {
+ throw new IllegalArgumentException("your sql \"" + sql
+ + "\" is a query statement, you should use executeQueryStatement
method instead.");
+ }
+
+ TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle,
sql);
+ TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
+ operationHandle = execResp.getOperationHandle();
+
+ RpcUtils.verifySuccess(execResp.getStatus());
+ }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
new file mode 100644
index 0000000..950c67e
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSOperationHandle;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.thrift.TException;
+
+public class SessionDataSet {
+
+ private boolean getFlag = false;
+ private String sql;
+ private long queryId;
+ private RowRecord record;
+ private Iterator<RowRecord> recordItr;
+ private TSIService.Iface client = null;
+ private TSOperationHandle operationHandle;
+ private int batchSize = 512;
+
+ public SessionDataSet(String sql, long queryId, TSIService.Iface client,
+ TSOperationHandle operationHandle) {
+ this.sql = sql;
+ this.queryId = queryId;
+ this.client = client;
+ this.operationHandle = operationHandle;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public boolean hasNext() throws SQLException, IoTDBRPCException {
+ return getFlag || nextWithoutConstraints(sql, queryId);
+ }
+
+ public RowRecord next() throws SQLException, IoTDBRPCException {
+ if (!getFlag) {
+ nextWithoutConstraints(sql, queryId);
+ }
+
+ getFlag = false;
+ return record;
+ }
+
+
+ private boolean nextWithoutConstraints(String sql, long queryId)
+ throws SQLException, IoTDBRPCException {
+ if ((recordItr == null || !recordItr.hasNext())) {
+ TSFetchResultsReq req = new TSFetchResultsReq(sql, batchSize, queryId);
+
+ try {
+ TSFetchResultsResp resp = client.fetchResults(req);
+
+ RpcUtils.verifySuccess(resp.getStatus());
+
+ if (!resp.hasResultSet) {
+ return false;
+ } else {
+ TSQueryDataSet tsQueryDataSet = resp.getQueryDataSet();
+ List<RowRecord> records =
SessionUtils.convertRowRecords(tsQueryDataSet);
+ recordItr = records.iterator();
+ }
+ } catch (TException e) {
+ throw new SQLException(
+ "Cannot fetch result from server, because of network connection :
{} ", e);
+ }
+
+ }
+
+ record = recordItr.next();
+ getFlag = true;
+ return true;
+ }
+
+ public void closeOperationHandle() throws SQLException {
+ try {
+ if (operationHandle != null) {
+ TSCloseOperationReq closeReq = new
TSCloseOperationReq(operationHandle, queryId);
+ TSStatus closeResp = client.closeOperation(closeReq);
+ RpcUtils.verifySuccess(closeResp);
+ }
+ } catch (IoTDBRPCException e) {
+ throw new SQLException("Error occurs for close opeation in server side.
The reason is " + e);
+ } catch (TException e) {
+ throw new SQLException(
+ "Error occurs when connecting to server for close operation,
because: " + e);
+ }
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
index 77fd93b..dc00576 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java
@@ -19,8 +19,15 @@
package org.apache.iotdb.session;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.service.rpc.thrift.TSDataValue;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSRowRecord;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.write.record.RowBatch;
@@ -87,4 +94,67 @@ public class SessionUtils {
return valueBuffer;
}
+
+
+ /**
+ * convert row records.
+ *
+ * @param tsQueryDataSet -query data set
+ * @return -list of row record
+ */
+ static List<RowRecord> convertRowRecords(TSQueryDataSet tsQueryDataSet) {
+ List<RowRecord> records = new ArrayList<>();
+ for (TSRowRecord ts : tsQueryDataSet.getRecords()) {
+ RowRecord r = new RowRecord(ts.getTimestamp());
+ int l = ts.getValuesSize();
+ for (int i = 0; i < l; i++) {
+ TSDataValue value = ts.getValues().get(i);
+ if (value.is_empty) {
+ Field field = new Field(null);
+ field.setNull();
+ r.getFields().add(field);
+ } else {
+ TSDataType dataType = TSDataType.valueOf(value.getType());
+ Field field = new Field(dataType);
+ addFieldAccordingToDataType(field, dataType, value);
+ r.getFields().add(field);
+ }
+ }
+ records.add(r);
+ }
+ return records;
+ }
+
+ /**
+ *
+ * @param field -the field need to add new data
+ * @param dataType, -the data type of the new data
+ * @param value, -the value of the new data
+ */
+ private static void addFieldAccordingToDataType(Field field, TSDataType
dataType, TSDataValue value){
+ switch (dataType) {
+ case BOOLEAN:
+ field.setBoolV(value.isBool_val());
+ break;
+ case INT32:
+ field.setIntV(value.getInt_val());
+ break;
+ case INT64:
+ field.setLongV(value.getLong_val());
+ break;
+ case FLOAT:
+ field.setFloatV((float) value.getFloat_val());
+ break;
+ case DOUBLE:
+ field.setDoubleV(value.getDouble_val());
+ break;
+ case TEXT:
+ field.setBinaryV(new Binary(value.getBinary_val()));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("data type %s is not supported when convert data at
client",
+ dataType));
+ }
+ }
}
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 05d1c91..435ffa7 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,28 +18,34 @@
*/
package org.apache.iotdb.session;
-import java.sql.*;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
-import java.io.File;
-
-import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.rpc.IoTDBRPCException;
import org.apache.iotdb.session.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
public class IoTDBSessionIT {
private IoTDB daemon;
@@ -55,12 +61,14 @@ public class IoTDBSessionIT {
@After
public void tearDown() throws Exception {
+ session.close();
daemon.stop();
EnvironmentUtils.cleanEnv();
}
@Test
- public void test() throws ClassNotFoundException, SQLException,
IoTDBSessionException {
+ public void test()
+ throws ClassNotFoundException, SQLException, IoTDBSessionException,
TException, IoTDBRPCException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
@@ -68,6 +76,11 @@ public class IoTDBSessionIT {
createTimeseries();
insert();
+
+ // sql test
+ insert_via_sql();
+ query3();
+
// insertRowBatchTest();
deleteData();
@@ -79,7 +92,8 @@ public class IoTDBSessionIT {
// Add another storage group to test the deletion of storage group
session.setStorageGroup("root.sg2");
- session.createTimeseries("root.sg2.d1.s1", TSDataType.INT64,
TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries("root.sg2.d1.s1", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
deleteStorageGroupTest();
@@ -87,9 +101,12 @@ public class IoTDBSessionIT {
}
private void createTimeseries() throws IoTDBSessionException {
- session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64,
TSEncoding.RLE, CompressionType.SNAPPY);
- session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64,
TSEncoding.RLE, CompressionType.SNAPPY);
- session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64,
TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
}
private void insert() throws IoTDBSessionException {
@@ -141,7 +158,7 @@ public class IoTDBSessionIT {
String path1 = "root.sg1.d1.s1";
String path2 = "root.sg1.d1.s2";
String path3 = "root.sg1.d1.s3";
- long deleteTime = 99;
+ long deleteTime = 100;
List<String> paths = new ArrayList<>();
paths.add(path1);
@@ -160,7 +177,7 @@ public class IoTDBSessionIT {
"Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" +
"root.sg1.d1.s3\n";
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
- Statement statement = connection.createStatement()) {
+ Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery("select * from root");
final ResultSetMetaData metaData = resultSet.getMetaData();
final int colCount = metaData.getColumnCount();
@@ -169,11 +186,11 @@ public class IoTDBSessionIT {
resultStr.append(metaData.getColumnLabel(i + 1) + "\n");
}
while (resultSet.next()) {
- for (int i = 1; i <= colCount; i++) {
- resultStr.append(resultSet.getString(i)).append(",");
- }
- resultStr.append("\n");
+ for (int i = 1; i <= colCount; i++) {
+ resultStr.append(resultSet.getString(i)).append(",");
}
+ resultStr.append("\n");
+ }
Assert.assertEquals(resultStr.toString(), standard);
}
}
@@ -202,7 +219,8 @@ public class IoTDBSessionIT {
}
}
- public void deleteStorageGroupTest() throws ClassNotFoundException,
SQLException, IoTDBSessionException {
+ public void deleteStorageGroupTest()
+ throws ClassNotFoundException, SQLException, IoTDBSessionException {
try {
session.deleteStorageGroup("root.sg1.d1.s1");
} catch (IoTDBSessionException e) {
@@ -212,13 +230,14 @@ public class IoTDBSessionIT {
File folder = new File("data/system/storage_groups/root.sg1/");
assertEquals(folder.exists(), false);
session.setStorageGroup("root.sg1.d1");
- session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64,
TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
// using the query result as the QueryTest to verify the deletion and the
new insertion
Class.forName(Config.JDBC_DRIVER_NAME);
String standard = "Time\n" + "root.sg1.d1.s1\n" + "root.sg2.d1.s1\n";
try (Connection connection = DriverManager
- .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
- Statement statement = connection.createStatement()) {
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
+ Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery("select * from root");
final ResultSetMetaData metaData = resultSet.getMetaData();
final int colCount = metaData.getColumnCount();
@@ -239,4 +258,26 @@ public class IoTDBSessionIT {
session.deleteStorageGroups(storageGroups);
}
}
+
+ private void query3() throws TException, IoTDBRPCException, SQLException {
+ SessionDataSet sessionDataSet = session.executeQueryStatement("select *
from root.sg1.d1");
+ sessionDataSet.setBatchSize(1024);
+ int count = 0;
+ while (sessionDataSet.hasNext()) {
+ long index = 1;
+ count++;
+ for (Field f : sessionDataSet.next().getFields()) {
+ Assert.assertEquals(f.getLongV(), index);
+ index++;
+ }
+ }
+ Assert.assertEquals(101, count);
+ sessionDataSet.closeOperationHandle();
+ }
+
+
+ private void insert_via_sql() throws TException, IoTDBRPCException {
+ session.executeNonQueryStatement(
+ "insert into root.sg1.d1(timestamp,s1, s2, s3) values(100, 1,2,3)");
+ }
}