This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0fe46eb [IOTDB-630] Add a jdbc-like way to fetch data in session
(#1129)
0fe46eb is described below
commit 0fe46eb2802863e66827b33f4a3ed6df0130cd89
Author: Jackie Tien <[email protected]>
AuthorDate: Fri May 8 13:30:02 2020 +0800
[IOTDB-630] Add a jdbc-like way to fetch data in session (#1129)
* Add a jdbc-like way to fetch data in session
---
.../main/java/org/apache/iotdb/SessionExample.java | 40 +-
.../java/org/apache/iotdb/session/Session.java | 2 +-
.../org/apache/iotdb/session/SessionDataSet.java | 459 ++++++++++++++++-----
.../iotdb/session/pool/SessionDataSetWrapper.java | 4 +-
.../org/apache/iotdb/session/IoTDBSessionIT.java | 8 +-
.../iotdb/session/IoTDBSessionIteratorIT.java | 122 ++++++
6 files changed, 519 insertions(+), 116 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 6aa0e25..57d0a64 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -18,21 +18,21 @@
*/
package org.apache.iotdb;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.session.SessionDataSet.DataIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.Schema;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
public class SessionExample {
@@ -49,7 +49,6 @@ public class SessionExample {
if (!e.getMessage().contains("StorageGroupAlreadySetException")) {
throw e;
}
-// ignore duplicated set
}
createTimeseries();
@@ -60,6 +59,7 @@ public class SessionExample {
insertRecords();
nonQuery();
query();
+ queryByIterator();
deleteData();
deleteTimeseries();
session.close();
@@ -95,7 +95,8 @@ public class SessionExample {
private static void createMultiTimeseries()
throws IoTDBConnectionException, BatchExecutionException {
- if (!session.checkTimeseriesExists("root.sg1.d2.s1") &&
!session.checkTimeseriesExists("root.sg1.d2.s2")) {
+ if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session
+ .checkTimeseriesExists("root.sg1.d2.s2")) {
List<String> paths = new ArrayList<>();
paths.add("root.sg1.d2.s1");
paths.add("root.sg1.d2.s2");
@@ -126,8 +127,9 @@ public class SessionExample {
alias.add("weight1");
alias.add("weight2");
- session.createMultiTimeseries(paths, tsDataTypes, tsEncodings,
compressionTypes, null, tagsList,
- attributesList, alias);
+ session
+ .createMultiTimeseries(paths, tsDataTypes, tsEncodings,
compressionTypes, null, tagsList,
+ attributesList, alias);
}
}
@@ -245,7 +247,7 @@ public class SessionExample {
Tablet tablet1 = new Tablet("root.sg1.d1", schemaList, 100);
Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100);
Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100);
-
+
Map<String, Tablet> tabletMap = new HashMap<>();
tabletMap.put("root.sg1.d1", tablet1);
tabletMap.put("root.sg1.d2", tablet2);
@@ -309,7 +311,7 @@ public class SessionExample {
SessionDataSet dataSet;
dataSet = session.executeQueryStatement("select * from root.sg1.d1");
System.out.println(dataSet.getColumnNames());
- dataSet.setBatchSize(1024); // default is 512
+ dataSet.setFetchSize(1024); // default is 512
while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
@@ -317,6 +319,22 @@ public class SessionExample {
dataSet.closeOperationHandle();
}
+ private static void queryByIterator()
+ throws IoTDBConnectionException, StatementExecutionException {
+ SessionDataSet dataSet;
+ dataSet = session.executeQueryStatement("select * from root.sg1.d1");
+ DataIterator iterator = dataSet.iterator();
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 512
+ while (iterator.next()) {
+ System.out.println(String.format("%s,%s,%s,%s,%s", iterator.getLong(1),
iterator.getLong(2),
+ iterator.getLong("root.sg1.d1.s2"), iterator.getLong(4),
+ iterator.getObject("root.sg1.d1.s4")));
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
private static void nonQuery() throws IoTDBConnectionException,
StatementExecutionException {
session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1)
values(200, 1);");
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 7ef4d62..07245e6 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -651,7 +651,7 @@ public class Session {
}
RpcUtils.verifySuccess(execResp.getStatus());
- return new SessionDataSet(sql, execResp.getColumns(),
execResp.getDataTypeList(),
+ return new SessionDataSet(sql, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap,
execResp.getQueryId(), client, sessionId, execResp.queryDataSet);
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index 763922d..0ab24af 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -19,11 +19,12 @@
package org.apache.iotdb.session;
import java.nio.ByteBuffer;
-import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -39,34 +40,42 @@ import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
public class SessionDataSet {
+ private static final String TIMESTAMP_STR = "Time";
+ private static final int START_INDEX = 2;
+ private static final String VALUE_IS_NULL = "The value got by %s (column
name) is NULL.";
+
private boolean hasCachedRecord = false;
+ // indicate that there is no more data
+ private boolean emptyResultSet = false;
private String sql;
private long queryId;
private long sessionId;
private TSIService.Iface client;
- private int batchSize = 1024;
+ private int fetchSize = 1024;
private List<String> columnNameList;
- private List<String> columnTypeDeduplicatedList;
- // duplicated column index -> origin index
- Map<Integer, Integer> duplicateLocation;
+ protected List<TSDataType> columnTypeDeduplicatedList; // deduplicated from
columnTypeList
// column name -> column location
- Map<String, Integer> columnMap;
+ private Map<String, Integer> columnOrdinalMap;
// column size
- int columnSize = 0;
+ int columnSize;
private int rowsIndex = 0; // used to record the row index in current
TSQueryDataSet
private TSQueryDataSet tsQueryDataSet;
- private RowRecord rowRecord = null;
private byte[] currentBitmap; // used to cache the current bitmap for every
column
private static final int flag = 0x80; // used to do `or` operation with
bitmap to judge whether the value is null
+ private byte[] time; // used to cache the current time value
+ private byte[][] values; // used to cache the current row record value
+
public SessionDataSet(String sql, List<String> columnNameList, List<String>
columnTypeList,
+ Map<String, Integer> columnNameIndex,
long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet
queryDataSet) {
this.sessionId = sessionId;
this.sql = sql;
@@ -76,127 +85,210 @@ public class SessionDataSet {
currentBitmap = new byte[columnNameList.size()];
columnSize = columnNameList.size();
- // deduplicate columnTypeList according to columnNameList
- this.columnTypeDeduplicatedList = new ArrayList<>();
- // duplicated column index -> origin index
- duplicateLocation = new HashMap<>();
- // column name -> column location
- columnMap = new HashMap<>();
- for (int i = 0; i < columnNameList.size(); i++) {
- String name = columnNameList.get(i);
- if (columnMap.containsKey(name)) {
- duplicateLocation.put(i, columnMap.get(name));
- } else {
- columnMap.put(name, i);
- columnTypeDeduplicatedList.add(columnTypeList.get(i));
+ this.columnNameList = new ArrayList<>();
+ this.columnNameList.add(TIMESTAMP_STR);
+ // deduplicate and map
+ this.columnOrdinalMap = new HashMap<>();
+ this.columnOrdinalMap.put(TIMESTAMP_STR, 1);
+
+ // deduplicated column name -> column location
+ Map<String, Integer> columnMap = new HashMap<>();
+
+ // deduplicate and map
+ if (columnNameIndex != null) {
+ this.columnTypeDeduplicatedList = new
ArrayList<>(columnNameIndex.size());
+ for (int i = 0; i < columnNameIndex.size(); i++) {
+ columnTypeDeduplicatedList.add(null);
+ }
+ for (int i = 0; i < columnNameList.size(); i++) {
+ String name = columnNameList.get(i);
+ this.columnNameList.add(name);
+ if (!columnOrdinalMap.containsKey(name)) {
+ int index = columnNameIndex.get(name);
+ columnMap.put(name, i);
+ columnOrdinalMap.put(name, index + START_INDEX);
+ columnTypeDeduplicatedList.set(index,
TSDataType.valueOf(columnTypeList.get(i)));
+ }
+ }
+ } else {
+ this.columnTypeDeduplicatedList = new ArrayList<>();
+ int index = START_INDEX;
+ for (int i = 0; i < columnNameList.size(); i++) {
+ String name = columnNameList.get(i);
+ this.columnNameList.add(name);
+ if (!columnOrdinalMap.containsKey(name)) {
+ columnMap.put(name, i);
+ columnOrdinalMap.put(name, index++);
+
columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i)));
+ }
}
}
+ time = new byte[Long.BYTES];
+ values = new byte[columnNameList.size()][];
this.tsQueryDataSet = queryDataSet;
}
- public int getBatchSize() {
- return batchSize;
+ public int getFetchSize() {
+ return fetchSize;
}
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
+ public void setFetchSize(int fetchSize) {
+ this.fetchSize = fetchSize;
}
public List<String> getColumnNames() {
return columnNameList;
}
- public boolean hasNext() throws IoTDBConnectionException,
StatementExecutionException {
+
+ private boolean fetchResults() throws IoTDBConnectionException,
StatementExecutionException {
+ rowsIndex = 0;
+ TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize,
queryId, true);
+ try {
+ TSFetchResultsResp resp = client.fetchResults(req);
+
+ RpcUtils.verifySuccess(resp.getStatus());
+ if (!resp.hasResultSet) {
+ emptyResultSet = true;
+ } else {
+ tsQueryDataSet = resp.getQueryDataSet();
+ }
+ return resp.hasResultSet;
+ } catch (TException e) {
+ throw new IoTDBConnectionException(
+ "Cannot fetch result from server, because of network connection: {}
", e);
+ }
+ }
+
+ private boolean hasCachedResults() {
+ return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining());
+ }
+
+ public boolean hasNext() throws StatementExecutionException,
IoTDBConnectionException {
+
if (hasCachedRecord) {
return true;
}
- if (tsQueryDataSet == null || !tsQueryDataSet.time.hasRemaining()) {
- TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, batchSize,
queryId, true);
- try {
- TSFetchResultsResp resp = client.fetchResults(req);
- RpcUtils.verifySuccess(resp.getStatus());
-
- if (!resp.hasResultSet) {
- return false;
- } else {
- tsQueryDataSet = resp.getQueryDataSet();
- rowsIndex = 0;
- }
- } catch (TException e) {
- throw new IoTDBConnectionException(
- "Cannot fetch result from server, because of network connection:
{} ", e);
- }
+ if (hasCachedResults()) {
+ constructOneRow();
+ return true;
+ }
+ if (emptyResultSet) {
+ return false;
}
+ if (fetchResults()) {
+ constructOneRow();
+ return true;
+ }
+ return false;
+ }
- constructOneRow();
+ private void constructOneRow() {
+ tsQueryDataSet.time.get(time);
+ for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
+ ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
+ // another new 8 row, should move the bitmap buffer position to next byte
+ if (rowsIndex % 8 == 0) {
+ currentBitmap[i] = bitmapBuffer.get();
+ }
+ values[i] = null;
+ if (!isNull(i, rowsIndex)) {
+ ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
+ TSDataType dataType = columnTypeDeduplicatedList.get(i);
+ switch (dataType) {
+ case BOOLEAN:
+ if (values[i] == null) {
+ values[i] = new byte[1];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case INT32:
+ if (values[i] == null) {
+ values[i] = new byte[Integer.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case INT64:
+ if (values[i] == null) {
+ values[i] = new byte[Long.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case FLOAT:
+ if (values[i] == null) {
+ values[i] = new byte[Float.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case DOUBLE:
+ if (values[i] == null) {
+ values[i] = new byte[Double.BYTES];
+ }
+ valueBuffer.get(values[i]);
+ break;
+ case TEXT:
+ int length = valueBuffer.getInt();
+ values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String
+ .format("Data type %s is not supported.",
columnTypeDeduplicatedList.get(i)));
+ }
+ }
+ }
+ rowsIndex++;
hasCachedRecord = true;
- return true;
}
-
- private void constructOneRow() {
+ private RowRecord constructRowRecordFromValueArray() {
List<Field> outFields = new ArrayList<>();
- int loc = 0;
for (int i = 0; i < columnSize; i++) {
Field field;
- if (duplicateLocation.containsKey(i)) {
- field = Field.copy(outFields.get(duplicateLocation.get(i)));
- } else {
- ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(loc);
- // another new 8 row, should move the bitmap buffer position to next
byte
- if (rowsIndex % 8 == 0) {
- currentBitmap[loc] = bitmapBuffer.get();
- }
+ int loc = columnOrdinalMap.get(columnNameList.get(i + 1)) - START_INDEX;
+ byte[] valueBytes = values[loc];
- if (!isNull(loc, rowsIndex)) {
- ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(loc);
- TSDataType dataType =
TSDataType.valueOf(columnTypeDeduplicatedList.get(loc));
- field = new Field(dataType);
- switch (dataType) {
- case BOOLEAN:
- boolean booleanValue = BytesUtils.byteToBool(valueBuffer.get());
- field.setBoolV(booleanValue);
- break;
- case INT32:
- int intValue = valueBuffer.getInt();
- field.setIntV(intValue);
- break;
- case INT64:
- long longValue = valueBuffer.getLong();
- field.setLongV(longValue);
- break;
- case FLOAT:
- float floatValue = valueBuffer.getFloat();
- field.setFloatV(floatValue);
- break;
- case DOUBLE:
- double doubleValue = valueBuffer.getDouble();
- field.setDoubleV(doubleValue);
- break;
- case TEXT:
- int binarySize = valueBuffer.getInt();
- byte[] binaryValue = new byte[binarySize];
- valueBuffer.get(binaryValue);
- field.setBinaryV(new Binary(binaryValue));
- break;
- default:
- throw new UnSupportedDataTypeException(String
- .format("Data type %s is not supported.",
columnTypeDeduplicatedList.get(i)));
- }
- } else {
- field = new Field(null);
+ if (valueBytes != null) {
+ TSDataType dataType = columnTypeDeduplicatedList.get(loc);
+ field = new Field(dataType);
+ switch (dataType) {
+ case BOOLEAN:
+ boolean booleanValue = BytesUtils.bytesToBool(valueBytes);
+ field.setBoolV(booleanValue);
+ break;
+ case INT32:
+ int intValue = BytesUtils.bytesToInt(valueBytes);
+ field.setIntV(intValue);
+ break;
+ case INT64:
+ long longValue = BytesUtils.bytesToLong(valueBytes);
+ field.setLongV(longValue);
+ break;
+ case FLOAT:
+ float floatValue = BytesUtils.bytesToFloat(valueBytes);
+ field.setFloatV(floatValue);
+ break;
+ case DOUBLE:
+ double doubleValue = BytesUtils.bytesToDouble(valueBytes);
+ field.setDoubleV(doubleValue);
+ break;
+ case TEXT:
+ field.setBinaryV(new Binary(valueBytes));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String
+ .format("Data type %s is not supported.",
columnTypeDeduplicatedList.get(i)));
}
- loc++;
+ } else {
+ field = new Field(null);
}
outFields.add(field);
}
-
- rowRecord = new RowRecord(tsQueryDataSet.time.getLong(), outFields);
- rowsIndex++;
+ return new RowRecord(BytesUtils.bytesToLong(time), outFields);
}
/**
@@ -216,9 +308,9 @@ public class SessionDataSet {
return null;
}
}
-
hasCachedRecord = false;
- return rowRecord;
+
+ return constructRowRecordFromValueArray();
}
public void closeOperationHandle() throws StatementExecutionException,
IoTDBConnectionException {
@@ -232,4 +324,175 @@ public class SessionDataSet {
"Error occurs when connecting to server for close operation,
because: " + e, e);
}
}
+
+ public DataIterator iterator() {
+ return new DataIterator();
+ }
+
+ public class DataIterator {
+
+ public boolean next() throws StatementExecutionException,
IoTDBConnectionException {
+ if (hasCachedResults()) {
+ constructOneRow();
+ return true;
+ }
+ if (emptyResultSet) {
+ return false;
+ }
+ if (fetchResults()) {
+ constructOneRow();
+ return true;
+ }
+ return false;
+ }
+
+ public boolean getBoolean(int columnIndex) throws
StatementExecutionException {
+ return getBoolean(findColumnNameByIndex(columnIndex));
+ }
+
+ public boolean getBoolean(String columnName) throws
StatementExecutionException {
+ checkRecord();
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (values[index] != null) {
+ return BytesUtils.bytesToBool(values[index]);
+ } else {
+ throw new StatementExecutionException(String.format(VALUE_IS_NULL,
columnName));
+ }
+ }
+
+ public double getDouble(int columnIndex) throws
StatementExecutionException {
+ return getDouble(findColumnNameByIndex(columnIndex));
+ }
+
+ public double getDouble(String columnName) throws
StatementExecutionException {
+ checkRecord();
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (values[index] != null) {
+ return BytesUtils.bytesToDouble(values[index]);
+ } else {
+ throw new StatementExecutionException(String.format(VALUE_IS_NULL,
columnName));
+ }
+ }
+
+ public float getFloat(int columnIndex) throws StatementExecutionException {
+ return getFloat(findColumnNameByIndex(columnIndex));
+ }
+
+ public float getFloat(String columnName) throws
StatementExecutionException {
+ checkRecord();
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (values[index] != null) {
+ return BytesUtils.bytesToFloat(values[index]);
+ } else {
+ throw new StatementExecutionException(String.format(VALUE_IS_NULL,
columnName));
+ }
+ }
+
+ public int getInt(int columnIndex) throws StatementExecutionException {
+ return getInt(findColumnNameByIndex(columnIndex));
+ }
+
+ public int getInt(String columnName) throws StatementExecutionException {
+ checkRecord();
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (values[index] != null) {
+ return BytesUtils.bytesToInt(values[index]);
+ } else {
+ throw new StatementExecutionException(String.format(VALUE_IS_NULL,
columnName));
+ }
+ }
+
+ public long getLong(int columnIndex) throws StatementExecutionException {
+ return getLong(findColumnNameByIndex(columnIndex));
+ }
+
+ public long getLong(String columnName) throws StatementExecutionException {
+ checkRecord();
+ if (columnName.equals(TIMESTAMP_STR)) {
+ return BytesUtils.bytesToLong(time);
+ }
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (values[index] != null) {
+ return BytesUtils.bytesToLong(values[index]);
+ } else {
+ throw new StatementExecutionException(String.format(VALUE_IS_NULL,
columnName));
+ }
+ }
+
+ public Object getObject(int columnIndex) throws
StatementExecutionException {
+ return getObject(findColumnNameByIndex(columnIndex));
+ }
+
+ public Object getObject(String columnName) throws
StatementExecutionException {
+ return getValueByName(columnName);
+ }
+
+ public String getString(int columnIndex) throws
StatementExecutionException {
+ return getString(findColumnNameByIndex(columnIndex));
+ }
+
+ public String getString(String columnName) throws
StatementExecutionException {
+ return getValueByName(columnName);
+ }
+
+ public Timestamp getTimestamp(int columnIndex) throws
StatementExecutionException {
+ return new Timestamp(getLong(columnIndex));
+ }
+
+ public Timestamp getTimestamp(String columnName) throws
StatementExecutionException {
+ return getTimestamp(findColumn(columnName));
+ }
+
+ public int findColumn(String columnName) {
+ return columnOrdinalMap.get(columnName);
+ }
+
+ private String getValueByName(String columnName) throws
StatementExecutionException {
+ checkRecord();
+ if (columnName.equals(TIMESTAMP_STR)) {
+ return String.valueOf(BytesUtils.bytesToLong(time));
+ }
+ int index = columnOrdinalMap.get(columnName) - START_INDEX;
+ if (index < 0 || index >= values.length || values[index] == null) {
+ return null;
+ }
+ return getString(index, columnTypeDeduplicatedList.get(index), values);
+ }
+
+ protected String getString(int index, TSDataType tsDataType, byte[][]
values) {
+ switch (tsDataType) {
+ case BOOLEAN:
+ return String.valueOf(BytesUtils.bytesToBool(values[index]));
+ case INT32:
+ return String.valueOf(BytesUtils.bytesToInt(values[index]));
+ case INT64:
+ return String.valueOf(BytesUtils.bytesToLong(values[index]));
+ case FLOAT:
+ return String.valueOf(BytesUtils.bytesToFloat(values[index]));
+ case DOUBLE:
+ return String.valueOf(BytesUtils.bytesToDouble(values[index]));
+ case TEXT:
+ return new String(values[index]);
+ default:
+ return null;
+ }
+ }
+
+ private void checkRecord() throws StatementExecutionException {
+ if (Objects.isNull(tsQueryDataSet)) {
+ throw new StatementExecutionException("No record remains");
+ }
+ }
+ }
+
+ private String findColumnNameByIndex(int columnIndex) throws
StatementExecutionException {
+ if (columnIndex <= 0) {
+ throw new StatementExecutionException("column index should start from
1");
+ }
+ if (columnIndex > columnNameList.size()) {
+ throw new StatementExecutionException(
+ String.format("column index %d out of range %d", columnIndex,
columnNameList.size()));
+ }
+ return columnNameList.get(columnIndex - 1);
+ }
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
index c1cc7e2..b8fa8d6 100644
---
a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
+++
b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -43,11 +43,11 @@ public class SessionDataSetWrapper {
}
public int getBatchSize() {
- return sessionDataSet.getBatchSize();
+ return sessionDataSet.getFetchSize();
}
public void setBatchSize(int batchSize) {
- sessionDataSet.setBatchSize(batchSize);
+ sessionDataSet.setFetchSize(batchSize);
}
/**
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 1940e35..8cfb4d1 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -603,7 +603,7 @@ public class IoTDBSessionIT {
throws StatementExecutionException, IoTDBConnectionException {
SessionDataSet sessionDataSet = session
.executeQueryStatement("select '11', s1, '11' from root.sg1.d1 align
by device");
- sessionDataSet.setBatchSize(1024);
+ sessionDataSet.setFetchSize(1024);
int count = 0;
while (sessionDataSet.hasNext()) {
count++;
@@ -622,7 +622,7 @@ public class IoTDBSessionIT {
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet sessionDataSet = session.executeQueryStatement(
"select '11', s1, '11', s5, s1, s5 from root.sg1.d1 align by device");
- sessionDataSet.setBatchSize(1024);
+ sessionDataSet.setFetchSize(1024);
int count = 0;
while (sessionDataSet.hasNext()) {
count++;
@@ -705,7 +705,7 @@ public class IoTDBSessionIT {
private void query4() throws IoTDBConnectionException,
StatementExecutionException {
SessionDataSet sessionDataSet = session.executeQueryStatement("select *
from root.sg1.d2");
- sessionDataSet.setBatchSize(1024);
+ sessionDataSet.setFetchSize(1024);
int count = 0;
while (sessionDataSet.hasNext()) {
long index = 1;
@@ -722,7 +722,7 @@ public class IoTDBSessionIT {
private void query3() throws IoTDBConnectionException,
StatementExecutionException {
SessionDataSet sessionDataSet = session.executeQueryStatement("select *
from root.sg1.d1");
- sessionDataSet.setBatchSize(1024);
+ sessionDataSet.setFetchSize(1024);
int count = 0;
while (sessionDataSet.hasNext()) {
long index = 1;
diff --git
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
new file mode 100644
index 0000000..8d3b3fd
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.SessionDataSet.DataIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBSessionIteratorIT {
+
+ private Session session;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ session.close();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() {
+ String[] retArray = new String[]{
+ "0,1,2.0,null",
+ "1,1,2.0,null",
+ "2,1,2.0,null",
+ "3,1,2.0,null",
+ "4,1,2.0,null",
+ "5,1,2.0,4.0",
+ "6,1,2.0,4.0",
+ "7,1,2.0,4.0",
+ "8,1,2.0,4.0",
+ "9,1,2.0,4.0",
+ };
+
+ try {
+ SessionDataSet sessionDataSet = session.executeQueryStatement("select *
from root.sg1");
+ sessionDataSet.setFetchSize(1024);
+ DataIterator iterator = sessionDataSet.iterator();
+ int count = 0;
+ while (iterator.next()) {
+ String ans = String.format("%s,%s,%s,%s", iterator.getLong(1),
iterator.getInt("root.sg1.d1.s1"),
+ iterator.getFloat(3), iterator.getString("root.sg1.d2.s1"));
+ assertEquals(retArray[count], ans);
+ count++;
+ }
+ assertEquals(retArray.length, count);
+ sessionDataSet.closeOperationHandle();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ private void prepareData() throws IoTDBConnectionException,
StatementExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+
+ session.setStorageGroup("root.sg1");
+ session.createTimeseries("root.sg1.d1.s1", TSDataType.INT32,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s2", TSDataType.FLOAT,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d2.s1", TSDataType.DOUBLE,
TSEncoding.RLE,
+ CompressionType.SNAPPY);
+ String deviceId = "root.sg1.d1";
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ for (long time = 0; time < 10; time++) {
+ List<String> values = new ArrayList<>();
+ values.add("1");
+ values.add("2");
+ session.insertRecord(deviceId, time, measurements, values);
+ }
+
+ deviceId = "root.sg1.d2";
+ measurements = new ArrayList<>();
+ measurements.add("s1");
+ for (long time = 5; time < 10; time++) {
+ List<String> values = new ArrayList<>();
+ values.add("4");
+ session.insertRecord(deviceId, time, measurements, values);
+ }
+ }
+
+}