This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fe46eb  [IOTDB-630] Add a jdbc-like way to fetch data in session 
(#1129)
0fe46eb is described below

commit 0fe46eb2802863e66827b33f4a3ed6df0130cd89
Author: Jackie Tien <[email protected]>
AuthorDate: Fri May 8 13:30:02 2020 +0800

    [IOTDB-630] Add a jdbc-like way to fetch data in session (#1129)
    
    * Add a jdbc-like way to fetch data in session
---
 .../main/java/org/apache/iotdb/SessionExample.java |  40 +-
 .../java/org/apache/iotdb/session/Session.java     |   2 +-
 .../org/apache/iotdb/session/SessionDataSet.java   | 459 ++++++++++++++++-----
 .../iotdb/session/pool/SessionDataSetWrapper.java  |   4 +-
 .../org/apache/iotdb/session/IoTDBSessionIT.java   |   8 +-
 .../iotdb/session/IoTDBSessionIteratorIT.java      | 122 ++++++
 6 files changed, 519 insertions(+), 116 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 6aa0e25..57d0a64 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -18,21 +18,21 @@
  */
 package org.apache.iotdb;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.session.SessionDataSet.DataIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.Schema;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 public class SessionExample {
 
@@ -49,7 +49,6 @@ public class SessionExample {
       if (!e.getMessage().contains("StorageGroupAlreadySetException")) {
         throw e;
       }
-//       ignore duplicated set
     }
 
     createTimeseries();
@@ -60,6 +59,7 @@ public class SessionExample {
     insertRecords();
     nonQuery();
     query();
+    queryByIterator();
     deleteData();
     deleteTimeseries();
     session.close();
@@ -95,7 +95,8 @@ public class SessionExample {
   private static void createMultiTimeseries()
       throws IoTDBConnectionException, BatchExecutionException {
 
-    if (!session.checkTimeseriesExists("root.sg1.d2.s1") && 
!session.checkTimeseriesExists("root.sg1.d2.s2")) {
+    if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session
+        .checkTimeseriesExists("root.sg1.d2.s2")) {
       List<String> paths = new ArrayList<>();
       paths.add("root.sg1.d2.s1");
       paths.add("root.sg1.d2.s2");
@@ -126,8 +127,9 @@ public class SessionExample {
       alias.add("weight1");
       alias.add("weight2");
 
-      session.createMultiTimeseries(paths, tsDataTypes, tsEncodings, 
compressionTypes, null, tagsList,
-          attributesList, alias);
+      session
+          .createMultiTimeseries(paths, tsDataTypes, tsEncodings, 
compressionTypes, null, tagsList,
+              attributesList, alias);
     }
   }
 
@@ -245,7 +247,7 @@ public class SessionExample {
     Tablet tablet1 = new Tablet("root.sg1.d1", schemaList, 100);
     Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100);
     Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100);
-    
+
     Map<String, Tablet> tabletMap = new HashMap<>();
     tabletMap.put("root.sg1.d1", tablet1);
     tabletMap.put("root.sg1.d2", tablet2);
@@ -309,7 +311,7 @@ public class SessionExample {
     SessionDataSet dataSet;
     dataSet = session.executeQueryStatement("select * from root.sg1.d1");
     System.out.println(dataSet.getColumnNames());
-    dataSet.setBatchSize(1024); // default is 512
+    dataSet.setFetchSize(1024); // default is 512
     while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
@@ -317,6 +319,22 @@ public class SessionExample {
     dataSet.closeOperationHandle();
   }
 
+  private static void queryByIterator()
+      throws IoTDBConnectionException, StatementExecutionException {
+    SessionDataSet dataSet;
+    dataSet = session.executeQueryStatement("select * from root.sg1.d1");
+    DataIterator iterator = dataSet.iterator();
+    System.out.println(dataSet.getColumnNames());
+    dataSet.setFetchSize(1024); // default is 512
+    while (iterator.next()) {
+      System.out.println(String.format("%s,%s,%s,%s,%s", iterator.getLong(1), 
iterator.getLong(2),
+          iterator.getLong("root.sg1.d1.s2"), iterator.getLong(4),
+          iterator.getObject("root.sg1.d1.s4")));
+    }
+
+    dataSet.closeOperationHandle();
+  }
+
   private static void nonQuery() throws IoTDBConnectionException, 
StatementExecutionException {
     session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) 
values(200, 1);");
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 7ef4d62..07245e6 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -651,7 +651,7 @@ public class Session {
     }
 
     RpcUtils.verifySuccess(execResp.getStatus());
-    return new SessionDataSet(sql, execResp.getColumns(), 
execResp.getDataTypeList(),
+    return new SessionDataSet(sql, execResp.getColumns(), 
execResp.getDataTypeList(), execResp.columnNameIndexMap,
         execResp.getQueryId(), client, sessionId, execResp.queryDataSet);
   }
 
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java 
b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index 763922d..0ab24af 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -19,11 +19,12 @@
 package org.apache.iotdb.session;
 
 import java.nio.ByteBuffer;
-import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -39,34 +40,42 @@ import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.thrift.TException;
 
 public class SessionDataSet {
 
+  private static final String TIMESTAMP_STR = "Time";
+  private static final int START_INDEX = 2;
+  private static final String VALUE_IS_NULL = "The value got by %s (column 
name) is NULL.";
+
   private boolean hasCachedRecord = false;
+  // indicate that there is no more data
+  private boolean emptyResultSet = false;
   private String sql;
   private long queryId;
   private long sessionId;
   private TSIService.Iface client;
-  private int batchSize = 1024;
+  private int fetchSize = 1024;
   private List<String> columnNameList;
-  private List<String> columnTypeDeduplicatedList;
-  // duplicated column index -> origin index
-  Map<Integer, Integer> duplicateLocation;
+  protected List<TSDataType> columnTypeDeduplicatedList; // deduplicated from 
columnTypeList
   // column name -> column location
-  Map<String, Integer> columnMap;
+  private Map<String, Integer> columnOrdinalMap;
   // column size
-  int columnSize = 0;
+  int columnSize;
 
 
   private int rowsIndex = 0; // used to record the row index in current 
TSQueryDataSet
   private TSQueryDataSet tsQueryDataSet;
-  private RowRecord rowRecord = null;
   private byte[] currentBitmap; // used to cache the current bitmap for every 
column
   private static final int flag = 0x80; // used to do `or` operation with 
bitmap to judge whether the value is null
 
+  private byte[] time; // used to cache the current time value
+  private byte[][] values; // used to cache the current row record value
+
 
   public SessionDataSet(String sql, List<String> columnNameList, List<String> 
columnTypeList,
+      Map<String, Integer> columnNameIndex,
       long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet 
queryDataSet) {
     this.sessionId = sessionId;
     this.sql = sql;
@@ -76,127 +85,210 @@ public class SessionDataSet {
     currentBitmap = new byte[columnNameList.size()];
     columnSize = columnNameList.size();
 
-    // deduplicate columnTypeList according to columnNameList
-    this.columnTypeDeduplicatedList = new ArrayList<>();
-    // duplicated column index -> origin index
-    duplicateLocation = new HashMap<>();
-    // column name -> column location
-    columnMap = new HashMap<>();
-    for (int i = 0; i < columnNameList.size(); i++) {
-      String name = columnNameList.get(i);
-      if (columnMap.containsKey(name)) {
-        duplicateLocation.put(i, columnMap.get(name));
-      } else {
-        columnMap.put(name, i);
-        columnTypeDeduplicatedList.add(columnTypeList.get(i));
+    this.columnNameList = new ArrayList<>();
+    this.columnNameList.add(TIMESTAMP_STR);
+    // deduplicate and map
+    this.columnOrdinalMap = new HashMap<>();
+    this.columnOrdinalMap.put(TIMESTAMP_STR, 1);
+
+    // deduplicated column name -> column location
+    Map<String, Integer> columnMap = new HashMap<>();
+
+    // deduplicate and map
+    if (columnNameIndex != null) {
+      this.columnTypeDeduplicatedList = new 
ArrayList<>(columnNameIndex.size());
+      for (int i = 0; i < columnNameIndex.size(); i++) {
+        columnTypeDeduplicatedList.add(null);
+      }
+      for (int i = 0; i < columnNameList.size(); i++) {
+        String name = columnNameList.get(i);
+        this.columnNameList.add(name);
+        if (!columnOrdinalMap.containsKey(name)) {
+          int index = columnNameIndex.get(name);
+          columnMap.put(name, i);
+          columnOrdinalMap.put(name, index + START_INDEX);
+          columnTypeDeduplicatedList.set(index, 
TSDataType.valueOf(columnTypeList.get(i)));
+        }
+      }
+    } else {
+      this.columnTypeDeduplicatedList = new ArrayList<>();
+      int index = START_INDEX;
+      for (int i = 0; i < columnNameList.size(); i++) {
+        String name = columnNameList.get(i);
+        this.columnNameList.add(name);
+        if (!columnOrdinalMap.containsKey(name)) {
+          columnMap.put(name, i);
+          columnOrdinalMap.put(name, index++);
+          
columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i)));
+        }
       }
     }
 
+    time = new byte[Long.BYTES];
+    values = new byte[columnNameList.size()][];
     this.tsQueryDataSet = queryDataSet;
   }
 
-  public int getBatchSize() {
-    return batchSize;
+  public int getFetchSize() {
+    return fetchSize;
   }
 
-  public void setBatchSize(int batchSize) {
-    this.batchSize = batchSize;
+  public void setFetchSize(int fetchSize) {
+    this.fetchSize = fetchSize;
   }
 
   public List<String> getColumnNames() {
     return columnNameList;
   }
 
-  public boolean hasNext() throws IoTDBConnectionException, 
StatementExecutionException {
+
+  private boolean fetchResults() throws IoTDBConnectionException, 
StatementExecutionException {
+    rowsIndex = 0;
+    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, 
queryId, true);
+    try {
+      TSFetchResultsResp resp = client.fetchResults(req);
+
+      RpcUtils.verifySuccess(resp.getStatus());
+      if (!resp.hasResultSet) {
+        emptyResultSet = true;
+      } else {
+        tsQueryDataSet = resp.getQueryDataSet();
+      }
+      return resp.hasResultSet;
+    } catch (TException e) {
+      throw new IoTDBConnectionException(
+          "Cannot fetch result from server, because of network connection: {} 
", e);
+    }
+  }
+
+  private boolean hasCachedResults() {
+    return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining());
+  }
+
+  public boolean hasNext() throws StatementExecutionException, 
IoTDBConnectionException {
+
     if (hasCachedRecord) {
       return true;
     }
-    if (tsQueryDataSet == null || !tsQueryDataSet.time.hasRemaining()) {
-      TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, batchSize, 
queryId, true);
-      try {
-        TSFetchResultsResp resp = client.fetchResults(req);
-        RpcUtils.verifySuccess(resp.getStatus());
-
-        if (!resp.hasResultSet) {
-          return false;
-        } else {
-          tsQueryDataSet = resp.getQueryDataSet();
-          rowsIndex = 0;
-        }
-      } catch (TException e) {
-        throw new IoTDBConnectionException(
-            "Cannot fetch result from server, because of network connection: 
{} ", e);
-      }
 
+    if (hasCachedResults()) {
+      constructOneRow();
+      return true;
+    }
+    if (emptyResultSet) {
+      return false;
     }
+    if (fetchResults()) {
+      constructOneRow();
+      return true;
+    }
+    return false;
+  }
 
-    constructOneRow();
+  private void constructOneRow() {
+    tsQueryDataSet.time.get(time);
+    for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
+      ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
+      // another new 8 row, should move the bitmap buffer position to next byte
+      if (rowsIndex % 8 == 0) {
+        currentBitmap[i] = bitmapBuffer.get();
+      }
+      values[i] = null;
+      if (!isNull(i, rowsIndex)) {
+        ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
+        TSDataType dataType = columnTypeDeduplicatedList.get(i);
+        switch (dataType) {
+          case BOOLEAN:
+            if (values[i] == null) {
+              values[i] = new byte[1];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case INT32:
+            if (values[i] == null) {
+              values[i] = new byte[Integer.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case INT64:
+            if (values[i] == null) {
+              values[i] = new byte[Long.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case FLOAT:
+            if (values[i] == null) {
+              values[i] = new byte[Float.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case DOUBLE:
+            if (values[i] == null) {
+              values[i] = new byte[Double.BYTES];
+            }
+            valueBuffer.get(values[i]);
+            break;
+          case TEXT:
+            int length = valueBuffer.getInt();
+            values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String
+                    .format("Data type %s is not supported.", 
columnTypeDeduplicatedList.get(i)));
+        }
+      }
+    }
+    rowsIndex++;
     hasCachedRecord = true;
-    return true;
   }
 
 
-
-  private void constructOneRow() {
+  private RowRecord constructRowRecordFromValueArray() {
     List<Field> outFields = new ArrayList<>();
-    int loc = 0;
     for (int i = 0; i < columnSize; i++) {
       Field field;
 
-      if (duplicateLocation.containsKey(i)) {
-        field = Field.copy(outFields.get(duplicateLocation.get(i)));
-      } else {
-        ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(loc);
-        // another new 8 row, should move the bitmap buffer position to next 
byte
-        if (rowsIndex % 8 == 0) {
-          currentBitmap[loc] = bitmapBuffer.get();
-        }
+      int loc = columnOrdinalMap.get(columnNameList.get(i + 1)) - START_INDEX;
+      byte[] valueBytes = values[loc];
 
-        if (!isNull(loc, rowsIndex)) {
-          ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(loc);
-          TSDataType dataType = 
TSDataType.valueOf(columnTypeDeduplicatedList.get(loc));
-          field = new Field(dataType);
-          switch (dataType) {
-            case BOOLEAN:
-              boolean booleanValue = BytesUtils.byteToBool(valueBuffer.get());
-              field.setBoolV(booleanValue);
-              break;
-            case INT32:
-              int intValue = valueBuffer.getInt();
-              field.setIntV(intValue);
-              break;
-            case INT64:
-              long longValue = valueBuffer.getLong();
-              field.setLongV(longValue);
-              break;
-            case FLOAT:
-              float floatValue = valueBuffer.getFloat();
-              field.setFloatV(floatValue);
-              break;
-            case DOUBLE:
-              double doubleValue = valueBuffer.getDouble();
-              field.setDoubleV(doubleValue);
-              break;
-            case TEXT:
-              int binarySize = valueBuffer.getInt();
-              byte[] binaryValue = new byte[binarySize];
-              valueBuffer.get(binaryValue);
-              field.setBinaryV(new Binary(binaryValue));
-              break;
-            default:
-              throw new UnSupportedDataTypeException(String
-                  .format("Data type %s is not supported.", 
columnTypeDeduplicatedList.get(i)));
-          }
-        } else {
-          field = new Field(null);
+      if (valueBytes != null) {
+        TSDataType dataType = columnTypeDeduplicatedList.get(loc);
+        field = new Field(dataType);
+        switch (dataType) {
+          case BOOLEAN:
+            boolean booleanValue = BytesUtils.bytesToBool(valueBytes);
+            field.setBoolV(booleanValue);
+            break;
+          case INT32:
+            int intValue = BytesUtils.bytesToInt(valueBytes);
+            field.setIntV(intValue);
+            break;
+          case INT64:
+            long longValue = BytesUtils.bytesToLong(valueBytes);
+            field.setLongV(longValue);
+            break;
+          case FLOAT:
+            float floatValue = BytesUtils.bytesToFloat(valueBytes);
+            field.setFloatV(floatValue);
+            break;
+          case DOUBLE:
+            double doubleValue = BytesUtils.bytesToDouble(valueBytes);
+            field.setDoubleV(doubleValue);
+            break;
+          case TEXT:
+            field.setBinaryV(new Binary(valueBytes));
+            break;
+          default:
+            throw new UnSupportedDataTypeException(String
+                .format("Data type %s is not supported.", 
columnTypeDeduplicatedList.get(i)));
         }
-        loc++;
+      } else {
+        field = new Field(null);
       }
       outFields.add(field);
     }
-
-    rowRecord = new RowRecord(tsQueryDataSet.time.getLong(), outFields);
-    rowsIndex++;
+    return new RowRecord(BytesUtils.bytesToLong(time), outFields);
   }
 
   /**
@@ -216,9 +308,9 @@ public class SessionDataSet {
         return null;
       }
     }
-
     hasCachedRecord = false;
-    return rowRecord;
+
+    return constructRowRecordFromValueArray();
   }
 
   public void closeOperationHandle() throws StatementExecutionException, 
IoTDBConnectionException {
@@ -232,4 +324,175 @@ public class SessionDataSet {
           "Error occurs when connecting to server for close operation, 
because: " + e, e);
     }
   }
+
+  public DataIterator iterator() {
+    return new DataIterator();
+  }
+
+  public class DataIterator {
+
+    public boolean next() throws StatementExecutionException, 
IoTDBConnectionException {
+      if (hasCachedResults()) {
+        constructOneRow();
+        return true;
+      }
+      if (emptyResultSet) {
+        return false;
+      }
+      if (fetchResults()) {
+        constructOneRow();
+        return true;
+      }
+      return false;
+    }
+
+    public boolean getBoolean(int columnIndex) throws 
StatementExecutionException {
+      return getBoolean(findColumnNameByIndex(columnIndex));
+    }
+
+    public boolean getBoolean(String columnName) throws 
StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToBool(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, 
columnName));
+      }
+    }
+
+    public double getDouble(int columnIndex) throws 
StatementExecutionException {
+      return getDouble(findColumnNameByIndex(columnIndex));
+    }
+
+    public double getDouble(String columnName) throws 
StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToDouble(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, 
columnName));
+      }
+    }
+
+    public float getFloat(int columnIndex) throws StatementExecutionException {
+      return getFloat(findColumnNameByIndex(columnIndex));
+    }
+
+    public float getFloat(String columnName) throws 
StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToFloat(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, 
columnName));
+      }
+    }
+
+    public int getInt(int columnIndex) throws StatementExecutionException {
+      return getInt(findColumnNameByIndex(columnIndex));
+    }
+
+    public int getInt(String columnName) throws StatementExecutionException {
+      checkRecord();
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToInt(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, 
columnName));
+      }
+    }
+
+    public long getLong(int columnIndex) throws StatementExecutionException {
+      return getLong(findColumnNameByIndex(columnIndex));
+    }
+
+    public long getLong(String columnName) throws StatementExecutionException {
+      checkRecord();
+      if (columnName.equals(TIMESTAMP_STR)) {
+        return BytesUtils.bytesToLong(time);
+      }
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (values[index] != null) {
+        return BytesUtils.bytesToLong(values[index]);
+      } else {
+        throw new StatementExecutionException(String.format(VALUE_IS_NULL, 
columnName));
+      }
+    }
+
+    public Object getObject(int columnIndex) throws 
StatementExecutionException {
+      return getObject(findColumnNameByIndex(columnIndex));
+    }
+
+    public Object getObject(String columnName) throws 
StatementExecutionException {
+      return getValueByName(columnName);
+    }
+
+    public String getString(int columnIndex) throws 
StatementExecutionException {
+      return getString(findColumnNameByIndex(columnIndex));
+    }
+
+    public String getString(String columnName) throws 
StatementExecutionException {
+      return getValueByName(columnName);
+    }
+
+    public Timestamp getTimestamp(int columnIndex) throws 
StatementExecutionException {
+      return new Timestamp(getLong(columnIndex));
+    }
+
+    public Timestamp getTimestamp(String columnName) throws 
StatementExecutionException {
+      return getTimestamp(findColumn(columnName));
+    }
+
+    public int findColumn(String columnName) {
+      return columnOrdinalMap.get(columnName);
+    }
+
+    private String getValueByName(String columnName) throws 
StatementExecutionException {
+      checkRecord();
+      if (columnName.equals(TIMESTAMP_STR)) {
+        return String.valueOf(BytesUtils.bytesToLong(time));
+      }
+      int index = columnOrdinalMap.get(columnName) - START_INDEX;
+      if (index < 0 || index >= values.length || values[index] == null) {
+        return null;
+      }
+      return getString(index, columnTypeDeduplicatedList.get(index), values);
+    }
+
+    protected String getString(int index, TSDataType tsDataType, byte[][] 
values) {
+      switch (tsDataType) {
+        case BOOLEAN:
+          return String.valueOf(BytesUtils.bytesToBool(values[index]));
+        case INT32:
+          return String.valueOf(BytesUtils.bytesToInt(values[index]));
+        case INT64:
+          return String.valueOf(BytesUtils.bytesToLong(values[index]));
+        case FLOAT:
+          return String.valueOf(BytesUtils.bytesToFloat(values[index]));
+        case DOUBLE:
+          return String.valueOf(BytesUtils.bytesToDouble(values[index]));
+        case TEXT:
+          return new String(values[index]);
+        default:
+          return null;
+      }
+    }
+
+    private void checkRecord() throws StatementExecutionException {
+      if (Objects.isNull(tsQueryDataSet)) {
+        throw new StatementExecutionException("No record remains");
+      }
+    }
+  }
+
+  private String findColumnNameByIndex(int columnIndex) throws 
StatementExecutionException {
+    if (columnIndex <= 0) {
+      throw new StatementExecutionException("column index should start from 
1");
+    }
+    if (columnIndex > columnNameList.size()) {
+      throw new StatementExecutionException(
+          String.format("column index %d out of range %d", columnIndex, 
columnNameList.size()));
+    }
+    return columnNameList.get(columnIndex - 1);
+  }
 }
diff --git 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
index c1cc7e2..b8fa8d6 100644
--- 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
+++ 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -43,11 +43,11 @@ public class SessionDataSetWrapper {
   }
 
   public int getBatchSize() {
-    return sessionDataSet.getBatchSize();
+    return sessionDataSet.getFetchSize();
   }
 
   public void setBatchSize(int batchSize) {
-    sessionDataSet.setBatchSize(batchSize);
+    sessionDataSet.setFetchSize(batchSize);
   }
 
   /**
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java 
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 1940e35..8cfb4d1 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -603,7 +603,7 @@ public class IoTDBSessionIT {
       throws StatementExecutionException, IoTDBConnectionException {
     SessionDataSet sessionDataSet = session
         .executeQueryStatement("select '11', s1, '11' from root.sg1.d1 align 
by device");
-    sessionDataSet.setBatchSize(1024);
+    sessionDataSet.setFetchSize(1024);
     int count = 0;
     while (sessionDataSet.hasNext()) {
       count++;
@@ -622,7 +622,7 @@ public class IoTDBSessionIT {
       throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet sessionDataSet = session.executeQueryStatement(
         "select '11', s1, '11', s5, s1, s5 from root.sg1.d1 align by device");
-    sessionDataSet.setBatchSize(1024);
+    sessionDataSet.setFetchSize(1024);
     int count = 0;
     while (sessionDataSet.hasNext()) {
       count++;
@@ -705,7 +705,7 @@ public class IoTDBSessionIT {
 
   private void query4() throws IoTDBConnectionException, 
StatementExecutionException {
     SessionDataSet sessionDataSet = session.executeQueryStatement("select * 
from root.sg1.d2");
-    sessionDataSet.setBatchSize(1024);
+    sessionDataSet.setFetchSize(1024);
     int count = 0;
     while (sessionDataSet.hasNext()) {
       long index = 1;
@@ -722,7 +722,7 @@ public class IoTDBSessionIT {
 
   private void query3() throws IoTDBConnectionException, 
StatementExecutionException {
     SessionDataSet sessionDataSet = session.executeQueryStatement("select * 
from root.sg1.d1");
-    sessionDataSet.setBatchSize(1024);
+    sessionDataSet.setFetchSize(1024);
     int count = 0;
     while (sessionDataSet.hasNext()) {
       long index = 1;
diff --git 
a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java 
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
new file mode 100644
index 0000000..8d3b3fd
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.SessionDataSet.DataIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBSessionIteratorIT {
+
+  private Session session;
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    session.close();
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() {
+    String[] retArray = new String[]{
+        "0,1,2.0,null",
+        "1,1,2.0,null",
+        "2,1,2.0,null",
+        "3,1,2.0,null",
+        "4,1,2.0,null",
+        "5,1,2.0,4.0",
+        "6,1,2.0,4.0",
+        "7,1,2.0,4.0",
+        "8,1,2.0,4.0",
+        "9,1,2.0,4.0",
+    };
+
+    try {
+      SessionDataSet sessionDataSet = session.executeQueryStatement("select * 
from root.sg1");
+      sessionDataSet.setFetchSize(1024);
+      DataIterator iterator = sessionDataSet.iterator();
+      int count = 0;
+      while (iterator.next()) {
+        String ans = String.format("%s,%s,%s,%s", iterator.getLong(1), 
iterator.getInt("root.sg1.d1.s1"),
+            iterator.getFloat(3), iterator.getString("root.sg1.d2.s1"));
+        assertEquals(retArray[count], ans);
+        count++;
+      }
+      assertEquals(retArray.length, count);
+      sessionDataSet.closeOperationHandle();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  private void prepareData() throws IoTDBConnectionException, 
StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+
+    session.setStorageGroup("root.sg1");
+    session.createTimeseries("root.sg1.d1.s1", TSDataType.INT32, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d1.s2", TSDataType.FLOAT, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d2.s1", TSDataType.DOUBLE, 
TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    String deviceId = "root.sg1.d1";
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    for (long time = 0; time < 10; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("1");
+      values.add("2");
+      session.insertRecord(deviceId, time, measurements, values);
+    }
+
+    deviceId = "root.sg1.d2";
+    measurements = new ArrayList<>();
+    measurements.add("s1");
+    for (long time = 5; time < 10; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("4");
+      session.insertRecord(deviceId, time, measurements, values);
+    }
+  }
+
+}

Reply via email to