This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch RefactorDataSet in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit cfec271db9b7327f261e3686ee61bcc0f132c99d Author: JackieTien97 <[email protected]> AuthorDate: Sat May 9 16:22:37 2020 +0800 tmp --- .../apache/iotdb/jdbc/AbstractIoTDBResultSet.java | 277 ++++++-------- .../iotdb/jdbc/IoTDBNonAlignQueryResultSet.java | 100 ++--- .../org/apache/iotdb/jdbc/IoTDBQueryResultSet.java | 157 ++------ service-rpc/pom.xml | 5 + .../org/apache/iotdb/rpc/AbstractIoTDBDataSet.java | 404 +++++++++++++++++++++ .../org/apache/iotdb/session/SessionDataSet.java | 4 - 6 files changed, 591 insertions(+), 356 deletions(-) diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java index ca927d7..ef6558c 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java @@ -19,100 +19,52 @@ package org.apache.iotdb.jdbc; -import org.apache.iotdb.rpc.RpcUtils; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; -import org.apache.iotdb.service.rpc.thrift.TSIService; -import org.apache.iotdb.service.rpc.thrift.TSStatus; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.apache.thrift.TException; - import java.io.InputStream; import java.io.Reader; import java.math.BigDecimal; import java.math.MathContext; import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; import java.sql.Date; -import java.sql.*; -import java.util.*; +import java.sql.NClob; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.iotdb.rpc.AbstractIoTDBDataSet; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.TSIService; +import org.apache.thrift.TException; public abstract class AbstractIoTDBResultSet implements ResultSet { - protected static final String TIMESTAMP_STR = "Time"; - protected static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL."; - protected static final int START_INDEX = 2; protected Statement statement; - protected String sql; protected SQLWarning warningChain = null; - protected boolean isClosed = false; - protected TSIService.Iface client; - protected List<String> columnNameList; // no deduplication - protected List<String> columnTypeList; // no deduplication - protected Map<String, Integer> columnOrdinalMap; // used because the server returns deduplicated columns - protected List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList - protected int fetchSize; - protected boolean emptyResultSet = false; - - - protected byte[][] values; // used to cache the current row record value - - - protected long sessionId; - protected long queryId; - protected boolean ignoreTimeStamp; - - + protected List<String> columnTypeList; + protected AbstractIoTDBDataSet abstractIoTDBDataSet; public AbstractIoTDBResultSet(Statement statement, List<String> columnNameList, - List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, TSIService.Iface client, - String sql, long queryId, long sessionId) - throws SQLException { + List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, + TSIService.Iface client, + String sql, long queryId, long sessionId) + throws SQLException { + this.abstractIoTDBDataSet = new AbstractIoTDBDataSet(sql, columnNameList, columnTypeList, + columnNameIndex, ignoreTimeStamp, queryId, client, sessionId, null, + statement.getFetchSize()); this.statement = statement; - this.fetchSize = statement.getFetchSize(); this.columnTypeList = columnTypeList; - values = new byte[columnNameList.size()][]; - - this.columnNameList = new ArrayList<>(); - if(!ignoreTimeStamp) { - this.columnNameList.add(TIMESTAMP_STR); - } - // deduplicate and map - this.columnOrdinalMap = new HashMap<>(); - if(!ignoreTimeStamp) { - this.columnOrdinalMap.put(TIMESTAMP_STR, 1); - } - if (columnNameIndex != null) { - this.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size()); - for (int i = 0; i < columnNameIndex.size(); i++) { - columnTypeDeduplicatedList.add(null); - } - for (int i = 0; i < columnNameList.size(); i++) { - String name = columnNameList.get(i); - this.columnNameList.add(name); - if (!columnOrdinalMap.containsKey(name)) { - int index = columnNameIndex.get(name); - columnOrdinalMap.put(name, index+START_INDEX); - columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); - } - } - } else { - this.columnTypeDeduplicatedList = new ArrayList<>(); - int index = START_INDEX; - for (int i = 0; i < columnNameList.size(); i++) { - String name = columnNameList.get(i); - this.columnNameList.add(name); - if (!columnOrdinalMap.containsKey(name)) { - columnOrdinalMap.put(name, index++); - columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i))); - } - } - } - this.ignoreTimeStamp = ignoreTimeStamp; - this.client = client; - this.sql = sql; - this.queryId = queryId; - this.sessionId = sessionId; } @Override @@ -152,23 +104,13 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public void close() throws SQLException { - if (isClosed) { - return; - } - if (client != null) { - try { - TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId); - closeReq.setQueryId(queryId); - TSStatus closeResp = client.closeOperation(closeReq); - RpcUtils.verifySuccess(closeResp); - } catch (StatementExecutionException e) { - throw new SQLException("Error occurs for close operation in server side because ", e); - } catch (TException e) { - throw new SQLException("Error occurs when connecting to server for close operation ", e); - } + try { + abstractIoTDBDataSet.close(); + } catch (StatementExecutionException e) { + throw new SQLException("Error occurs for close operation in server side because ", e); + } catch (TException e) { + throw new SQLException("Error occurs when connecting to server for close operation ", e); } - client = null; - isClosed = true; } @@ -179,7 +121,7 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public int findColumn(String columnName) { - return columnOrdinalMap.get(columnName); + return abstractIoTDBDataSet.findColumn(columnName); } @Override @@ -209,7 +151,11 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public BigDecimal getBigDecimal(int columnIndex) throws SQLException { - return getBigDecimal(findColumnNameByIndex(columnIndex)); + try { + return getBigDecimal(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override @@ -250,18 +196,19 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public boolean getBoolean(int columnIndex) throws SQLException { - return getBoolean(findColumnNameByIndex(columnIndex)); + try { + return getBoolean(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override public boolean getBoolean(String columnName) throws SQLException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToBool(values[index]); - } - else { - throw new SQLException(String.format(VALUE_IS_NULL, columnName)); + try { + return abstractIoTDBDataSet.getBoolean(columnName); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } } @@ -337,17 +284,19 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public double getDouble(int columnIndex) throws SQLException { - return getDouble(findColumnNameByIndex(columnIndex)); + try { + return getDouble(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override public double getDouble(String columnName) throws SQLException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToDouble(values[index]); - } else { - throw new SQLException(String.format(VALUE_IS_NULL, columnName)); + try { + return abstractIoTDBDataSet.getDouble(columnName); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } } @@ -373,17 +322,19 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public float getFloat(int columnIndex) throws SQLException { - return getFloat(findColumnNameByIndex(columnIndex)); + try { + return getInt(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override public float getFloat(String columnName) throws SQLException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToFloat(values[index]); - } else { - throw new SQLException(String.format(VALUE_IS_NULL, columnName)); + try { + return abstractIoTDBDataSet.getFloat(columnName); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } } @@ -394,23 +345,29 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public int getInt(int columnIndex) throws SQLException { - return getInt(findColumnNameByIndex(columnIndex)); + try { + return getInt(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override public int getInt(String columnName) throws SQLException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToInt(values[index]); - } else { - throw new SQLException(String.format(VALUE_IS_NULL, columnName)); + try { + return abstractIoTDBDataSet.getInt(columnName); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } } @Override public long getLong(int columnIndex) throws SQLException { - return getLong(findColumnNameByIndex(columnIndex)); + try { + return getLong(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override @@ -418,7 +375,7 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public ResultSetMetaData getMetaData() { - return new IoTDBResultMetadata(columnNameList, columnTypeList, ignoreTimeStamp); + return new IoTDBResultMetadata(abstractIoTDBDataSet.columnNameList, columnTypeList, abstractIoTDBDataSet.ignoreTimeStamp); } @Override @@ -453,7 +410,11 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public Object getObject(int columnIndex) throws SQLException { - return getObject(findColumnNameByIndex(columnIndex)); + try { + return getObject(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override @@ -518,7 +479,11 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public short getShort(int columnIndex) throws SQLException { - return getShort(findColumnNameByIndex(columnIndex)); + try { + return getShort(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override @@ -533,7 +498,11 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public String getString(int columnIndex) throws SQLException { - return getString(findColumnNameByIndex(columnIndex)); + try { + return getString(abstractIoTDBDataSet.findColumnNameByIndex(columnIndex)); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); + } } @Override @@ -628,7 +597,7 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public boolean isClosed() { - return isClosed; + return abstractIoTDBDataSet.isClosed; } @Override @@ -658,18 +627,11 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public boolean next() throws SQLException { - if (hasCachedResults()) { - constructOneRow(); - return true; - } - if (emptyResultSet) { - return false; - } - if (fetchResults()) { - constructOneRow(); - return true; + try { + return abstractIoTDBDataSet.next(); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } - return false; } @@ -1135,35 +1097,6 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { abstract void checkRecord() throws SQLException; - private String findColumnNameByIndex(int columnIndex) throws SQLException { - if (columnIndex <= 0) { - throw new SQLException("column index should start from 1"); - } - if (columnIndex > columnNameList.size()) { - throw new SQLException( - String.format("column index %d out of range %d", columnIndex, columnNameList.size())); - } - return columnNameList.get(columnIndex - 1); - } - abstract String getValueByName(String columnName) throws SQLException; - protected String getString(int index, TSDataType tsDataType, byte[][] values) { - switch (tsDataType) { - case BOOLEAN: - return String.valueOf(BytesUtils.bytesToBool(values[index])); - case INT32: - return String.valueOf(BytesUtils.bytesToInt(values[index])); - case INT64: - return String.valueOf(BytesUtils.bytesToLong(values[index])); - case FLOAT: - return String.valueOf(BytesUtils.bytesToFloat(values[index])); - case DOUBLE: - return String.valueOf(BytesUtils.bytesToDouble(values[index])); - case TEXT: - return new String(values[index]); - default: - return null; - } - } } diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java index d3484c2..28e959b 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java @@ -19,6 +19,10 @@ package org.apache.iotdb.jdbc; +import static org.apache.iotdb.rpc.AbstractIoTDBDataSet.START_INDEX; +import static org.apache.iotdb.rpc.AbstractIoTDBDataSet.TIMESTAMP_STR; +import static org.apache.iotdb.rpc.AbstractIoTDBDataSet.VALUE_IS_NULL; + import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; @@ -46,30 +50,33 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { // for disable align clause IoTDBNonAlignQueryResultSet(Statement statement, List<String> columnNameList, - List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, TSIService.Iface client, - String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset) - throws SQLException { - super(statement, columnNameList, columnTypeList, columnNameIndex, ignoreTimeStamp, client, sql, queryId, sessionId); + List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, + TSIService.Iface client, + String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset) + throws SQLException { + super(statement, columnNameList, columnTypeList, columnNameIndex, ignoreTimeStamp, client, sql, + queryId, sessionId); times = new byte[columnNameList.size()][Long.BYTES]; - super.columnNameList = new ArrayList<>(); + abstractIoTDBDataSet.columnNameList = new ArrayList<>(); // deduplicate and map - super.columnOrdinalMap = new HashMap<>(); - super.columnOrdinalMap.put(TIMESTAMP_STR, 1); - super.columnTypeDeduplicatedList = new ArrayList<>(); - super.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size()); + abstractIoTDBDataSet.columnOrdinalMap = new HashMap<>(); + abstractIoTDBDataSet.columnOrdinalMap.put(TIMESTAMP_STR, 1); + abstractIoTDBDataSet.columnTypeDeduplicatedList = new ArrayList<>(); + abstractIoTDBDataSet.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size()); for (int i = 0; i < columnNameIndex.size(); i++) { - super.columnTypeDeduplicatedList.add(null); + abstractIoTDBDataSet.columnTypeDeduplicatedList.add(null); } for (int i = 0; i < columnNameList.size(); i++) { String name = columnNameList.get(i); - super.columnNameList.add(TIMESTAMP_STR + name); - super.columnNameList.add(name); - if (!columnOrdinalMap.containsKey(name)) { + abstractIoTDBDataSet.columnNameList.add(TIMESTAMP_STR + name); + abstractIoTDBDataSet.columnNameList.add(name); + if (!abstractIoTDBDataSet.columnOrdinalMap.containsKey(name)) { int index = columnNameIndex.get(name); - columnOrdinalMap.put(name, index+START_INDEX); - columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); + abstractIoTDBDataSet.columnOrdinalMap.put(name, index + START_INDEX); + abstractIoTDBDataSet.columnTypeDeduplicatedList + .set(index, TSDataType.valueOf(columnTypeList.get(i))); } } this.tsQueryNonAlignDataSet = dataset; @@ -80,15 +87,16 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { checkRecord(); if (columnName.startsWith(TIMESTAMP_STR)) { String column = columnName.substring(TIMESTAMP_STR_LENGTH); - int index = columnOrdinalMap.get(column) - START_INDEX; - if (times[index] != null) + int index = abstractIoTDBDataSet.columnOrdinalMap.get(column) - START_INDEX; + if (times[index] != null) { return BytesUtils.bytesToLong(times[index]); - else + } else { throw new SQLException(String.format(VALUE_IS_NULL, columnName)); + } } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToLong(values[index]); + int index = abstractIoTDBDataSet.columnOrdinalMap.get(columnName) - START_INDEX; + if (abstractIoTDBDataSet.values[index] != null) { + return BytesUtils.bytesToLong(abstractIoTDBDataSet.values[index]); } else { throw new SQLException(String.format(VALUE_IS_NULL, columnName)); } @@ -96,9 +104,11 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { @Override protected boolean fetchResults() throws SQLException { - TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, false); + TSFetchResultsReq req = new TSFetchResultsReq(abstractIoTDBDataSet.sessionId, + abstractIoTDBDataSet.sql, abstractIoTDBDataSet.fetchSize, abstractIoTDBDataSet.queryId, + false); try { - TSFetchResultsResp resp = client.fetchResults(req); + TSFetchResultsResp resp = abstractIoTDBDataSet.client.fetchResults(req); try { RpcUtils.verifySuccess(resp.getStatus()); @@ -106,7 +116,7 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { throw new IoTDBSQLException(e.getMessage(), resp.getStatus()); } if (!resp.hasResultSet) { - emptyResultSet = true; + abstractIoTDBDataSet.emptyResultSet = true; } else { tsQueryNonAlignDataSet = resp.getNonAlignQueryDataSet(); if (tsQueryNonAlignDataSet == null) { @@ -116,7 +126,7 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { return resp.hasResultSet; } catch (TException e) { throw new SQLException( - "Cannot fetch result from server, because of network connection: {} ", e); + "Cannot fetch result from server, because of network connection: {} ", e); } } @@ -139,46 +149,46 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { protected void constructOneRow() { for (int i = 0; i < tsQueryNonAlignDataSet.timeList.size(); i++) { times[i] = null; - values[i] = null; + abstractIoTDBDataSet.values[i] = null; if (tsQueryNonAlignDataSet.timeList.get(i).remaining() >= Long.BYTES) { times[i] = new byte[Long.BYTES]; tsQueryNonAlignDataSet.timeList.get(i).get(times[i]); ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i); - TSDataType dataType = columnTypeDeduplicatedList.get(i); + TSDataType dataType = abstractIoTDBDataSet.columnTypeDeduplicatedList.get(i); switch (dataType) { case BOOLEAN: - values[i] = new byte[1]; - valueBuffer.get(values[i]); + abstractIoTDBDataSet.values[i] = new byte[1]; + valueBuffer.get(abstractIoTDBDataSet.values[i]); break; case INT32: - values[i] = new byte[Integer.BYTES]; - valueBuffer.get(values[i]); + abstractIoTDBDataSet.values[i] = new byte[Integer.BYTES]; + valueBuffer.get(abstractIoTDBDataSet.values[i]); break; case INT64: - values[i] = new byte[Long.BYTES]; - valueBuffer.get(values[i]); + abstractIoTDBDataSet.values[i] = new byte[Long.BYTES]; + valueBuffer.get(abstractIoTDBDataSet.values[i]); break; case FLOAT: - values[i] = new byte[Float.BYTES]; - valueBuffer.get(values[i]); + abstractIoTDBDataSet.values[i] = new byte[Float.BYTES]; + valueBuffer.get(abstractIoTDBDataSet.values[i]); break; case DOUBLE: - values[i] = new byte[Double.BYTES]; - valueBuffer.get(values[i]); + abstractIoTDBDataSet.values[i] = new byte[Double.BYTES]; + valueBuffer.get(abstractIoTDBDataSet.values[i]); break; case TEXT: int length = valueBuffer.getInt(); - values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length); + abstractIoTDBDataSet.values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length); break; default: throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i))); + String.format("Data type %s is not supported.", + abstractIoTDBDataSet.columnTypeDeduplicatedList.get(i))); } - } - else { - values[i] = EMPTY_STR.getBytes(); + } else { + abstractIoTDBDataSet.values[i] = EMPTY_STR.getBytes(); } } } @@ -195,14 +205,14 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { checkRecord(); if (columnName.startsWith(TIMESTAMP_STR)) { String column = columnName.substring(TIMESTAMP_STR_LENGTH); - int index = columnOrdinalMap.get(column) - START_INDEX; + int index = abstractIoTDBDataSet.columnOrdinalMap.get(column) - START_INDEX; if (times[index] == null || times[index].length == 0) { return null; } return String.valueOf(BytesUtils.bytesToLong(times[index])); } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 || index >= values.length || values[index] == null || values[index].length < 1) { + int index = abstractIoTDBDataSet.columnOrdinalMap.get(columnName) - START_INDEX; + if (index < 0 || index >= abstractIoTDBDataSet.values.length || abstractIoTDBDataSet.values[index] == null || abstractIoTDBDataSet.values[index].length < 1) { return null; } return getString(index, columnTypeDeduplicatedList.get(index), values); diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java index 7eade21..68f2b81 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java @@ -19,185 +19,72 @@ package org.apache.iotdb.jdbc; -import org.apache.iotdb.rpc.RpcUtils; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; -import org.apache.iotdb.service.rpc.thrift.TSIService; -import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; -import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.thrift.TException; - -import java.nio.ByteBuffer; import java.sql.SQLException; import java.sql.Statement; import java.util.List; import java.util.Map; -import java.util.Objects; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.TSIService; +import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; public class IoTDBQueryResultSet extends AbstractIoTDBResultSet { - private static final int START_INDEX = 2; - private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL."; - private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet - private boolean align = true; - - private TSQueryDataSet tsQueryDataSet = null; - private byte[] time; // used to cache the current time value - private byte[] currentBitmap; // used to cache the current bitmap for every column - private static final int FLAG = 0x80; // used to do `and` operation with bitmap to judge whether the value is null - public IoTDBQueryResultSet(Statement statement, List<String> columnNameList, List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, TSIService.Iface client, String sql, long queryId, long sessionId, TSQueryDataSet dataset) throws SQLException { super(statement, columnNameList, columnTypeList, columnNameIndex, ignoreTimeStamp, client, sql, queryId, sessionId); - time = new byte[Long.BYTES]; - currentBitmap = new byte[columnNameList.size()]; - this.tsQueryDataSet = dataset; + abstractIoTDBDataSet.setTsQueryDataSet(dataset); } @Override public long getLong(String columnName) throws SQLException { - checkRecord(); - if (columnName.equals(TIMESTAMP_STR)) { - return BytesUtils.bytesToLong(time); - } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToLong(values[index]); - } else { - throw new SQLException(String.format(VALUE_IS_NULL, columnName)); + try { + return abstractIoTDBDataSet.getLong(columnName); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } } @Override protected boolean fetchResults() throws SQLException { - rowsIndex = 0; - TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, align); try { - TSFetchResultsResp resp = client.fetchResults(req); - - try { - RpcUtils.verifySuccess(resp.getStatus()); - } catch (StatementExecutionException e) { - throw new IoTDBSQLException(e.getMessage(), resp.getStatus()); - } - if (!resp.hasResultSet) { - emptyResultSet = true; - } else { - tsQueryDataSet = resp.getQueryDataSet(); - } - return resp.hasResultSet; - } catch (TException e) { - throw new SQLException( - "Cannot fetch result from server, because of network connection: {} ", e); + return abstractIoTDBDataSet.fetchResults(); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } } @Override protected boolean hasCachedResults() { - return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining()); + return abstractIoTDBDataSet.hasCachedResults(); } @Override protected void constructOneRow() { - tsQueryDataSet.time.get(time); - for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) { - ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i); - // another new 8 row, should move the bitmap buffer position to next byte - if (rowsIndex % 8 == 0) { - currentBitmap[i] = bitmapBuffer.get(); - } - values[i] = null; - if (!isNull(i, rowsIndex)) { - ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i); - TSDataType dataType = columnTypeDeduplicatedList.get(i); - switch (dataType) { - case BOOLEAN: - if (values[i] == null) { - values[i] = new byte[1]; - } - valueBuffer.get(values[i]); - break; - case INT32: - if (values[i] == null) { - values[i] = new byte[Integer.BYTES]; - } - valueBuffer.get(values[i]); - break; - case INT64: - if (values[i] == null) { - values[i] = new byte[Long.BYTES]; - } - valueBuffer.get(values[i]); - break; - case FLOAT: - if (values[i] == null) { - values[i] = new byte[Float.BYTES]; - } - valueBuffer.get(values[i]); - break; - case DOUBLE: - if (values[i] == null) { - values[i] = new byte[Double.BYTES]; - } - valueBuffer.get(values[i]); - break; - case TEXT: - int length = valueBuffer.getInt(); - values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length); - break; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i))); - } - } - } - rowsIndex++; + abstractIoTDBDataSet.constructOneRow(); } - /** - * judge whether the specified column value is null in the current position - * - * @param index series index - * @param rowNum current position - */ - private boolean isNull(int index, int rowNum) { - byte bitmap = currentBitmap[index]; - int shift = rowNum % 8; - return ((FLAG >>> shift) & bitmap) == 0; - } @Override protected void checkRecord() throws SQLException { - if (Objects.isNull(tsQueryDataSet)) { - throw new SQLException("No record remains"); + try { + abstractIoTDBDataSet.checkRecord(); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } } @Override protected String getValueByName(String columnName) throws SQLException { - checkRecord(); - if (columnName.equals(TIMESTAMP_STR)) { - return String.valueOf(BytesUtils.bytesToLong(time)); - } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 || index >= values.length || values[index] == null) { - return null; + try { + return abstractIoTDBDataSet.getValueByName(columnName); + } catch (StatementExecutionException e) { + throw new SQLException(e.getMessage()); } - return getString(index, columnTypeDeduplicatedList.get(index), values); } public boolean isIgnoreTimeStamp() { - return ignoreTimeStamp; - } - - - public boolean isAlign() { - return align; + return abstractIoTDBDataSet.ignoreTimeStamp; } } diff --git a/service-rpc/pom.xml b/service-rpc/pom.xml index e257b46..fb0bf36 100644 --- a/service-rpc/pom.xml +++ b/service-rpc/pom.xml @@ -40,6 +40,11 @@ <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>tsfile</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <plugins> diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AbstractIoTDBDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AbstractIoTDBDataSet.java new file mode 100644 index 0000000..233c412 --- /dev/null +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AbstractIoTDBDataSet.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; +import org.apache.iotdb.service.rpc.thrift.TSIService; +import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; +import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.thrift.TException; + + +public class AbstractIoTDBDataSet { + + public static final String TIMESTAMP_STR = "Time"; + public static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL."; + public static final int START_INDEX = 2; + public String sql; + public boolean isClosed = false; + public TSIService.Iface client; + public List<String> columnNameList; // no deduplication + public Map<String, Integer> columnOrdinalMap; // used because the server returns deduplicated columns + public List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList + public int fetchSize; + public boolean emptyResultSet = false; + public boolean hasCachedRecord = false; + + + public byte[][] values; // used to cache the current row record value + // column size + public int columnSize; + + public long sessionId; + public long queryId; + public boolean ignoreTimeStamp; + + public int rowsIndex = 0; // used to record the row index in current TSQueryDataSet + + public TSQueryDataSet tsQueryDataSet = null; + public byte[] time; // used to cache the current time value + public byte[] currentBitmap; // used to cache the current bitmap for every column + public static final int FLAG = 0x80; // used to do `and` operation with bitmap to judge whether the value is null + + public AbstractIoTDBDataSet(String sql, List<String> columnNameList, List<String> columnTypeList, + Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, + long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet, int fetchSize) { + this.sessionId = sessionId; + this.ignoreTimeStamp = ignoreTimeStamp; + this.sql = sql; + this.queryId = queryId; + this.client = client; + this.fetchSize = fetchSize; + this.columnNameList = columnNameList; + currentBitmap = new byte[columnNameList.size()]; + columnSize = columnNameList.size(); + + this.columnNameList = new ArrayList<>(); + if (!ignoreTimeStamp) { + this.columnNameList.add(TIMESTAMP_STR); + } + // deduplicate and map + this.columnOrdinalMap = new HashMap<>(); + if (!ignoreTimeStamp) { + this.columnOrdinalMap.put(TIMESTAMP_STR, 1); + } + + // deduplicate and map + if (columnNameIndex != null) { + this.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size()); + for (int i = 0; i < columnNameIndex.size(); i++) { + columnTypeDeduplicatedList.add(null); + } + for (int i = 0; i < columnNameList.size(); i++) { + String name = columnNameList.get(i); + this.columnNameList.add(name); + if (!columnOrdinalMap.containsKey(name)) { + int index = columnNameIndex.get(name); + columnOrdinalMap.put(name, index + START_INDEX); + columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); + } + } + } else { + this.columnTypeDeduplicatedList = new ArrayList<>(); + int index = START_INDEX; + for (int i = 0; i < columnNameList.size(); i++) { + String name = columnNameList.get(i); + this.columnNameList.add(name); + if (!columnOrdinalMap.containsKey(name)) { + columnOrdinalMap.put(name, index++); + columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i))); + } + } + } + + time = new byte[Long.BYTES]; + currentBitmap = new byte[columnNameList.size()]; + values = new byte[columnNameList.size()][]; + this.tsQueryDataSet = queryDataSet; + } + + public void close() throws StatementExecutionException, TException { + if (isClosed) { + return; + } + if (client != null) { + try { + TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId); + closeReq.setQueryId(queryId); + TSStatus closeResp = client.closeOperation(closeReq); + RpcUtils.verifySuccess(closeResp); + } catch (StatementExecutionException e) { + throw new StatementExecutionException("Error occurs for close operation in server side because ", e); + } catch (TException e) { + throw new TException("Error occurs when connecting to server for close operation ", e); + } + } + client = null; + isClosed = true; + } + + public boolean next() throws StatementExecutionException { + if (hasCachedResults()) { + constructOneRow(); + return true; + } + if (emptyResultSet) { + return false; + } + if (fetchResults()) { + constructOneRow(); + return true; + } + return false; + } + + public boolean fetchResults() throws StatementExecutionException { + rowsIndex = 0; + TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true); + try { + TSFetchResultsResp resp = client.fetchResults(req); + + RpcUtils.verifySuccess(resp.getStatus()); + if (!resp.hasResultSet) { + emptyResultSet = true; + } else { + tsQueryDataSet = resp.getQueryDataSet(); + } + return resp.hasResultSet; + } catch (TException e) { + throw new StatementExecutionException( + "Cannot fetch result from server, because of network connection: {} ", e); + } + } + + public boolean hasCachedResults() { + return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining()); + } + + public void constructOneRow() { + tsQueryDataSet.time.get(time); + for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) { + ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i); + // another new 8 row, should move the bitmap buffer position to next byte + if (rowsIndex % 8 == 0) { + currentBitmap[i] = bitmapBuffer.get(); + } + values[i] = null; + if (!isNull(i, rowsIndex)) { + ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i); + TSDataType dataType = columnTypeDeduplicatedList.get(i); + switch (dataType) { + case BOOLEAN: + if (values[i] == null) { + values[i] = new byte[1]; + } + valueBuffer.get(values[i]); + break; + case INT32: + if (values[i] == null) { + values[i] = new byte[Integer.BYTES]; + } + valueBuffer.get(values[i]); + break; + case INT64: + if (values[i] == null) { + values[i] = new byte[Long.BYTES]; + } + valueBuffer.get(values[i]); + break; + case FLOAT: + if (values[i] == null) { + values[i] = new byte[Float.BYTES]; + } + valueBuffer.get(values[i]); + break; + case DOUBLE: + if (values[i] == null) { + values[i] = new byte[Double.BYTES]; + } + valueBuffer.get(values[i]); + break; + case TEXT: + int length = valueBuffer.getInt(); + values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length); + break; + default: + throw new UnSupportedDataTypeException( + String + .format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i))); + } + } + } + rowsIndex++; + hasCachedRecord = true; + } + + private boolean isNull(int index, int rowNum) { + byte bitmap = currentBitmap[index]; + int shift = rowNum % 8; + return ((FLAG >>> shift) & bitmap) == 0; + } + + public boolean getBoolean(int columnIndex) throws StatementExecutionException { + return getBoolean(findColumnNameByIndex(columnIndex)); + } + + public boolean getBoolean(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToBool(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public double getDouble(int columnIndex) throws StatementExecutionException { + return getDouble(findColumnNameByIndex(columnIndex)); + } + + public double getDouble(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToDouble(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public float getFloat(int columnIndex) throws StatementExecutionException { + return getFloat(findColumnNameByIndex(columnIndex)); + } + + public float getFloat(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToFloat(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public int getInt(int columnIndex) throws StatementExecutionException { + return getInt(findColumnNameByIndex(columnIndex)); + } + + public int getInt(String columnName) throws StatementExecutionException { + checkRecord(); + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToInt(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public long getLong(int columnIndex) throws StatementExecutionException { + return getLong(findColumnNameByIndex(columnIndex)); + } + + public long getLong(String columnName) throws StatementExecutionException { + checkRecord(); + if (columnName.equals(TIMESTAMP_STR)) { + return BytesUtils.bytesToLong(time); + } + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (values[index] != null) { + return BytesUtils.bytesToLong(values[index]); + } else { + throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); + } + } + + public Object getObject(int columnIndex) throws StatementExecutionException { + return getObject(findColumnNameByIndex(columnIndex)); + } + + public Object getObject(String columnName) throws StatementExecutionException { + return getValueByName(columnName); + } + + public String getString(int columnIndex) throws StatementExecutionException { + return getString(findColumnNameByIndex(columnIndex)); + } + + public String getString(String columnName) throws StatementExecutionException { + return getValueByName(columnName); + } + + public Timestamp getTimestamp(int columnIndex) throws StatementExecutionException { + return new Timestamp(getLong(columnIndex)); + } + + public Timestamp getTimestamp(String columnName) throws StatementExecutionException { + return getTimestamp(findColumn(columnName)); + } + + public int findColumn(String columnName) { + return columnOrdinalMap.get(columnName); + } + + public String getValueByName(String columnName) throws StatementExecutionException { + checkRecord(); + if (columnName.equals(TIMESTAMP_STR)) { + return String.valueOf(BytesUtils.bytesToLong(time)); + } + int index = columnOrdinalMap.get(columnName) - START_INDEX; + if (index < 0 || index >= values.length || values[index] == null) { + return null; + } + return getString(index, columnTypeDeduplicatedList.get(index), values); + } + + protected String getString(int index, TSDataType tsDataType, byte[][] values) { + switch (tsDataType) { + case BOOLEAN: + return String.valueOf(BytesUtils.bytesToBool(values[index])); + case INT32: + return String.valueOf(BytesUtils.bytesToInt(values[index])); + case INT64: + return String.valueOf(BytesUtils.bytesToLong(values[index])); + case FLOAT: + return String.valueOf(BytesUtils.bytesToFloat(values[index])); + case DOUBLE: + return String.valueOf(BytesUtils.bytesToDouble(values[index])); + case TEXT: + return new String(values[index]); + default: + return null; + } + } + + public String findColumnNameByIndex(int columnIndex) throws StatementExecutionException { + if (columnIndex <= 0) { + throw new StatementExecutionException("column index should start from 1"); + } + if (columnIndex > columnNameList.size()) { + throw new StatementExecutionException( + String.format("column index %d out of range %d", columnIndex, columnNameList.size())); + } + return columnNameList.get(columnIndex - 1); + } + + public void checkRecord() throws StatementExecutionException { + if (Objects.isNull(tsQueryDataSet)) { + throw new StatementExecutionException("No record remains"); + } + } + + public void setTsQueryDataSet(TSQueryDataSet tsQueryDataSet) { + this.tsQueryDataSet = tsQueryDataSet; + } +} diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java index 0ab24af..d11957b 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java @@ -91,8 +91,6 @@ public class SessionDataSet { this.columnOrdinalMap = new HashMap<>(); this.columnOrdinalMap.put(TIMESTAMP_STR, 1); - // deduplicated column name -> column location - Map<String, Integer> columnMap = new HashMap<>(); // deduplicate and map if (columnNameIndex != null) { @@ -105,7 +103,6 @@ public class SessionDataSet { this.columnNameList.add(name); if (!columnOrdinalMap.containsKey(name)) { int index = columnNameIndex.get(name); - columnMap.put(name, i); columnOrdinalMap.put(name, index + START_INDEX); columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); } @@ -117,7 +114,6 @@ public class SessionDataSet { String name = columnNameList.get(i); this.columnNameList.add(name); if (!columnOrdinalMap.containsKey(name)) { - columnMap.put(name, i); columnOrdinalMap.put(name, index++); columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i))); }
