This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch SessionNext in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 81a8166e81899eaaf8781cb9bef248cf5bf12bd8 Author: JackieTien97 <[email protected]> AuthorDate: Wed Apr 29 16:20:24 2020 +0800 Add a jdbc-like way to fetch data in session --- .../main/java/org/apache/iotdb/SessionExample.java | 50 ++-- .../java/org/apache/iotdb/session/Session.java | 2 +- .../org/apache/iotdb/session/SessionDataSet.java | 326 +++++++++++++++++++-- .../iotdb/session/IoTDBSessionIteratorIT.java | 122 ++++++++ 4 files changed, 458 insertions(+), 42 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 6aa0e25..03310d0 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -18,21 +18,21 @@ */ package org.apache.iotdb; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.iotdb.rpc.BatchExecutionException; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; import org.apache.iotdb.session.SessionDataSet; +import org.apache.iotdb.session.SessionDataSet.DataIterator; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.schema.Schema; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; public class SessionExample { @@ -49,7 +49,6 @@ public class SessionExample { if (!e.getMessage().contains("StorageGroupAlreadySetException")) { throw e; } -// ignore duplicated set } createTimeseries(); @@ -60,6 +59,7 @@ public class SessionExample { insertRecords(); nonQuery(); query(); + queryByIterator(); deleteData(); deleteTimeseries(); session.close(); @@ -95,7 +95,8 @@ public class SessionExample { private static void createMultiTimeseries() throws IoTDBConnectionException, BatchExecutionException { - if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session.checkTimeseriesExists("root.sg1.d2.s2")) { + if (!session.checkTimeseriesExists("root.sg1.d2.s1") && !session + .checkTimeseriesExists("root.sg1.d2.s2")) { List<String> paths = new ArrayList<>(); paths.add("root.sg1.d2.s1"); paths.add("root.sg1.d2.s2"); @@ -126,8 +127,9 @@ public class SessionExample { alias.add("weight1"); alias.add("weight2"); - session.createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, - attributesList, alias); + session + .createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes, null, tagsList, + attributesList, alias); } } @@ -193,15 +195,11 @@ public class SessionExample { /** * insert the data of a device. For each timestamp, the number of measurements is the same. - * + * <p> * a Tablet example: - * - * device1 - * time s1, s2, s3 - * 1, 1, 1, 1 - * 2, 2, 2, 2 - * 3, 3, 3, 3 - * + * <p> + * device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3 + * <p> * Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize */ private static void insertTablet() throws IoTDBConnectionException, BatchExecutionException { @@ -245,7 +243,7 @@ public class SessionExample { Tablet tablet1 = new Tablet("root.sg1.d1", schemaList, 100); Tablet tablet2 = new Tablet("root.sg1.d2", schemaList, 100); Tablet tablet3 = new Tablet("root.sg1.d3", schemaList, 100); - + Map<String, Tablet> tabletMap = new HashMap<>(); tabletMap.put("root.sg1.d1", tablet1); tabletMap.put("root.sg1.d2", tablet2); @@ -317,6 +315,22 @@ public class SessionExample { dataSet.closeOperationHandle(); } + private static void queryByIterator() + throws IoTDBConnectionException, StatementExecutionException { + SessionDataSet dataSet; + dataSet = session.executeQueryStatement("select * from root.sg1.d1"); + DataIterator iterator = dataSet.iterator(); + System.out.println(dataSet.getColumnNames()); + dataSet.setBatchSize(1024); // default is 512 + while (iterator.next()) { + System.out.println(String.format("%s,%s,%s,%s,%s", iterator.getLong(1), iterator.getLong(2), + iterator.getLong("root.sg1.d1.s2"), iterator.getLong(4), + iterator.getObject("root.sg1.d1.s4"))); + } + + dataSet.closeOperationHandle(); + } + private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException { session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);"); } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 0407093..15635e5 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -651,7 +651,7 @@ public class Session { } RpcUtils.verifySuccess(execResp.getStatus()); - return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(), + return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.getQueryId(), client, sessionId, execResp.queryDataSet); } diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java index 763922d..fdfaf1a 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java @@ -19,11 +19,12 @@ package org.apache.iotdb.session; import java.nio.ByteBuffer; -import java.sql.SQLException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; @@ -39,10 +40,15 @@ import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; public class SessionDataSet { + private static final String TIMESTAMP_STR = "Time"; + private static final int START_INDEX = 2; + private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL."; + private boolean hasCachedRecord = false; private String sql; private long queryId; @@ -50,11 +56,9 @@ public class SessionDataSet { private TSIService.Iface client; private int batchSize = 1024; private List<String> columnNameList; - private List<String> columnTypeDeduplicatedList; - // duplicated column index -> origin index - Map<Integer, Integer> duplicateLocation; + protected List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList // column name -> column location - Map<String, Integer> columnMap; + Map<String, Integer> columnOrdinalMap; // column size int columnSize = 0; @@ -65,8 +69,11 @@ public class SessionDataSet { private byte[] currentBitmap; // used to cache the current bitmap for every column private static final int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null + private byte[] time; // used to cache the current time value + private byte[][] values; // used to cache the current row record value + - public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList, + public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList, Map<String, Integer> columnNameIndex, long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet) { this.sessionId = sessionId; this.sql = sql; @@ -76,22 +83,42 @@ public class SessionDataSet { currentBitmap = new byte[columnNameList.size()]; columnSize = columnNameList.size(); - // deduplicate columnTypeList according to columnNameList - this.columnTypeDeduplicatedList = new ArrayList<>(); - // duplicated column index -> origin index - duplicateLocation = new HashMap<>(); - // column name -> column location - columnMap = new HashMap<>(); - for (int i = 0; i < columnNameList.size(); i++) { - String name = columnNameList.get(i); - if (columnMap.containsKey(name)) { - duplicateLocation.put(i, columnMap.get(name)); - } else { - columnMap.put(name, i); - columnTypeDeduplicatedList.add(columnTypeList.get(i)); + this.columnNameList = new ArrayList<>(); + this.columnNameList.add(TIMESTAMP_STR); + // deduplicate and map + this.columnOrdinalMap = new HashMap<>(); + this.columnOrdinalMap.put(TIMESTAMP_STR, 1); + + // deduplicate and map + if (columnNameIndex != null) { + this.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size()); + for (int i = 0; i < columnNameIndex.size(); i++) { + columnTypeDeduplicatedList.add(null); + } + for (int i = 0; i < columnNameList.size(); i++) { + String name = columnNameList.get(i); + this.columnNameList.add(name); + if (!columnOrdinalMap.containsKey(name)) { + int index = columnNameIndex.get(name); + columnOrdinalMap.put(name, index+START_INDEX); + columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); + } + } + } else { + this.columnTypeDeduplicatedList = new ArrayList<>(); + int index = START_INDEX; + for (int i = 0; i < columnNameList.size(); i++) { + String name = columnNameList.get(i); + this.columnNameList.add(name); + if (!columnOrdinalMap.containsKey(name)) { + columnOrdinalMap.put(name, index++); + columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i))); + } } } + time = new byte[Long.BYTES]; + values = new byte[columnNameList.size()][]; this.tsQueryDataSet = queryDataSet; } @@ -142,9 +169,9 @@ public class SessionDataSet { int loc = 0; for (int i = 0; i < columnSize; i++) { Field field; - - if (duplicateLocation.containsKey(i)) { - field = Field.copy(outFields.get(duplicateLocation.get(i))); + int deduplicatedIndex = columnOrdinalMap.get(columnNameList.get(i+1)) - START_INDEX; + if (deduplicatedIndex < i) { + field = Field.copy(outFields.get(deduplicatedIndex)); } else { ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(loc); // another new 8 row, should move the bitmap buffer position to next byte @@ -154,7 +181,7 @@ public class SessionDataSet { if (!isNull(loc, rowsIndex)) { ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(loc); - TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(loc)); + TSDataType dataType = columnTypeDeduplicatedList.get(loc); field = new Field(dataType); switch (dataType) { case BOOLEAN: @@ -232,4 +259,257 @@ public class SessionDataSet { "Error occurs when connecting to server for close operation, because: " + e, e); } } + + public DataIterator iterator() { + return new DataIterator(); + } + + public class DataIterator { + + private boolean emptyResultSet = false; + + public boolean next() throws StatementExecutionException { + if (hasCachedResults()) { + constructOneRow(); + return true; + } + if (emptyResultSet) { + return false; + } + if (fetchResults()) { + constructOneRow(); + return true; + } + return false; + } + + private boolean fetchResults() throws StatementExecutionException { + rowsIndex = 0; + TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, batchSize, queryId, true); + try { + TSFetchResultsResp resp = client.fetchResults(req); + + RpcUtils.verifySuccess(resp.getStatus()); + if (!resp.hasResultSet) { + emptyResultSet = true; + } else { + tsQueryDataSet = resp.getQueryDataSet(); + } + return resp.hasResultSet; + } catch (TException e) { + throw new StatementExecutionException( + "Cannot fetch result from server, because of network connection: {} ", e); + } + } + + private void constructOneRow() { + tsQueryDataSet.time.get(time); + for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) { + ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i); + // another new 8 row, should move the bitmap buffer position to next byte + if (rowsIndex % 8 == 0) { + currentBitmap[i] = bitmapBuffer.get(); + } + values[i] = null; + if (!isNull(i, rowsIndex)) { + ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i); + TSDataType dataType = columnTypeDeduplicatedList.get(i); + switch (dataType) { + case BOOLEAN: + if (values[i] == null) { + values[i] = new byte[1]; + } + valueBuffer.get(values[i]); + break; + case INT32: + if (values[i] == null) { + values[i] = new byte[Integer.BYTES]; + } + valueBuffer.get(values[i]); + break; + case INT64: + if (values[i] == null) { + values[i] = new byte[Long.BYTES]; + } + valueBuffer.get(values[i]); + break; + case FLOAT: + if (values[i] == null) { + values[i] = new byte[Float.BYTES]; + } + valueBuffer.get(values[i]); + break; + case DOUBLE: + if (values[i] == null) { + values[i] = new byte[Double.BYTES]; + } + valueBuffer.get(values[i]); + break; + case TEXT: + int length = valueBuffer.getInt(); + values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i))); + } + } + } + rowsIndex++; + } + + private boolean hasCachedResults() { + return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining()); + } + + public boolean getBoolean(int columnIndex) throws StatementExecutionException { + return getBoolean(findColumnNameByIndex(columnIndex)); + } + + public boolean getBoolean(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToBool(values[index]); + } + else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public double getDouble(int columnIndex) throws StatementExecutionException { + return getDouble(findColumnNameByIndex(columnIndex)); + } + + public double getDouble(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToDouble(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public float getFloat(int columnIndex) throws StatementExecutionException { + return getFloat(findColumnNameByIndex(columnIndex)); + } + + public float getFloat(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToFloat(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public int getInt(int columnIndex) throws StatementExecutionException { + return getInt(findColumnNameByIndex(columnIndex)); + } + + public int getInt(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToInt(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public long getLong(int columnIndex) throws StatementExecutionException { + return getLong(findColumnNameByIndex(columnIndex)); + } + + public long getLong(String columnName) throws StatementExecutionException { + checkRecord(); + if (columnName.equals(TIMESTAMP_STR)) { + return BytesUtils.bytesToLong(time); + } + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToLong(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public Object getObject(int columnIndex) throws StatementExecutionException { + return getObject(findColumnNameByIndex(columnIndex)); + } + + public Object getObject(String columnName) throws StatementExecutionException { + return getValueByName(columnName); + } + + public String getString(int columnIndex) throws StatementExecutionException { + return getString(findColumnNameByIndex(columnIndex)); + } + + public String getString(String columnName) throws StatementExecutionException { + return getValueByName(columnName); + } + + public Timestamp getTimestamp(int columnIndex) throws StatementExecutionException { + return new Timestamp(getLong(columnIndex)); + } + + public Timestamp getTimestamp(String columnName) throws StatementExecutionException { + return getTimestamp(findColumn(columnName)); + } + + public int findColumn(String columnName) { + return columnOrdinalMap.get(columnName); + } + + private String getValueByName(String columnName) throws StatementExecutionException { + checkRecord(); + if (columnName.equals(TIMESTAMP_STR)) { + return String.valueOf(BytesUtils.bytesToLong(time)); + } + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (index < 0 || index >= values.length || values[index] == null) { + return null; + } + return getString(index, columnTypeDeduplicatedList.get(index), values); + } + + protected String getString(int index, TSDataType tsDataType, byte[][] values) { + switch (tsDataType) { + case BOOLEAN: + return String.valueOf(BytesUtils.bytesToBool(values[index])); + case INT32: + return String.valueOf(BytesUtils.bytesToInt(values[index])); + case INT64: + return String.valueOf(BytesUtils.bytesToLong(values[index])); + case FLOAT: + return String.valueOf(BytesUtils.bytesToFloat(values[index])); + case DOUBLE: + return String.valueOf(BytesUtils.bytesToDouble(values[index])); + case TEXT: + return new String(values[index]); + default: + return null; + } + } + + private void checkRecord() throws StatementExecutionException { + if (Objects.isNull(tsQueryDataSet)) { + throw new StatementExecutionException("No record remains"); + } + } + } + + private String findColumnNameByIndex(int columnIndex) throws StatementExecutionException { + if (columnIndex <= 0) { + throw new StatementExecutionException("column index should start from 1"); + } + if (columnIndex > columnNameList.size()) { + throw new StatementExecutionException( + String.format("column index %d out of range %d", columnIndex, columnNameList.size())); + } + return columnNameList.get(columnIndex - 1); + } } diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java new file mode 100644 index 0000000..8d6ec3e --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.SessionDataSet.DataIterator; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class IoTDBSessionIteratorIT { + + private Session session; + + @Before + public void setUp() throws Exception { + System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/"); + EnvironmentUtils.closeStatMonitor(); + EnvironmentUtils.envSetUp(); + prepareData(); + } + + @After + public void tearDown() throws Exception { + session.close(); + EnvironmentUtils.cleanEnv(); + } + + @Test + public void test() { + String[] retArray = new String[]{ + "0,1,2.0,null", + "1,1,2.0,null", + "2,1,2.0,null", + "3,1,2.0,null", + "4,1,2.0,null", + "5,1,2.0,4.0", + "6,1,2.0,4.0", + "7,1,2.0,4.0", + "8,1,2.0,4.0", + "9,1,2.0,4.0", + }; + + try { + SessionDataSet sessionDataSet = session.executeQueryStatement("select * from root.sg1"); + sessionDataSet.setBatchSize(1024); + DataIterator iterator = sessionDataSet.iterator(); + int count = 0; + while (iterator.next()) { + String ans = String.format("%s,%s,%s,%s", iterator.getLong(1), iterator.getInt("root.sg1.d1.s1"), + iterator.getFloat(3), iterator.getString("root.sg1.d2.s1")); + assertEquals(retArray[count], ans); + count++; + } + assertEquals(retArray.length, count); + sessionDataSet.closeOperationHandle(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + private void prepareData() throws IoTDBConnectionException, StatementExecutionException { + session = new Session("127.0.0.1", 6667, "root", "root"); + session.open(); + + session.setStorageGroup("root.sg1"); + session.createTimeseries("root.sg1.d1.s1", TSDataType.INT32, TSEncoding.RLE, + CompressionType.SNAPPY); + session.createTimeseries("root.sg1.d1.s2", TSDataType.FLOAT, TSEncoding.RLE, + CompressionType.SNAPPY); + session.createTimeseries("root.sg1.d2.s1", TSDataType.DOUBLE, TSEncoding.RLE, + CompressionType.SNAPPY); + String deviceId = "root.sg1.d1"; + List<String> measurements = new ArrayList<>(); + measurements.add("s1"); + measurements.add("s2"); + for (long time = 0; time < 10; time++) { + List<String> values = new ArrayList<>(); + values.add("1"); + values.add("2"); + session.insertRecord(deviceId, time, measurements, values); + } + + deviceId = "root.sg1.d2"; + measurements = new ArrayList<>(); + measurements.add("s1"); + for (long time = 5; time < 10; time++) { + List<String> values = new ArrayList<>(); + values.add("4"); + session.insertRecord(deviceId, time, measurements, values); + } + } + +}
