This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch RefactorDataSet in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit af76409cd14b3a6cebb6dc8a1e73316eeea3ada6 Author: JackieTien97 <[email protected]> AuthorDate: Sat May 9 17:24:16 2020 +0800 move the same construct dataset logic to service-rpc --- .../apache/iotdb/jdbc/AbstractIoTDBResultSet.java | 17 +- .../iotdb/jdbc/IoTDBNonAlignQueryResultSet.java | 8 +- .../org/apache/iotdb/jdbc/IoTDBQueryResultSet.java | 3 +- .../org/apache/iotdb/rpc/AbstractIoTDBDataSet.java | 8 +- .../org/apache/iotdb/session/SessionDataSet.java | 355 +++------------------ 5 files changed, 61 insertions(+), 330 deletions(-) diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java index ef6558c..2ac2079 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBResultSet.java @@ -301,7 +301,7 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { } @Override - public int getFetchDirection() throws SQLException { + public int getFetchDirection() { return ResultSet.FETCH_FORWARD; } @@ -627,11 +627,18 @@ public abstract class AbstractIoTDBResultSet implements ResultSet { @Override public boolean next() throws SQLException { - try { - return abstractIoTDBDataSet.next(); - } catch (StatementExecutionException e) { - throw new SQLException(e.getMessage()); + if (hasCachedResults()) { + constructOneRow(); + return true; + } + if (abstractIoTDBDataSet.emptyResultSet) { + return false; + } + if (fetchResults()) { + constructOneRow(); + return true; } + return false; } diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java index 28e959b..fe7772c 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignQueryResultSet.java @@ -212,9 +212,13 @@ public class IoTDBNonAlignQueryResultSet extends AbstractIoTDBResultSet { return String.valueOf(BytesUtils.bytesToLong(times[index])); } int index = abstractIoTDBDataSet.columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 || index >= abstractIoTDBDataSet.values.length || abstractIoTDBDataSet.values[index] == null || abstractIoTDBDataSet.values[index].length < 1) { + if (index < 0 || index >= abstractIoTDBDataSet.values.length + || abstractIoTDBDataSet.values[index] == null + || abstractIoTDBDataSet.values[index].length < 1) { return null; } - return getString(index, columnTypeDeduplicatedList.get(index), values); + return abstractIoTDBDataSet + .getString(index, abstractIoTDBDataSet.columnTypeDeduplicatedList.get(index), + abstractIoTDBDataSet.values); } } diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java index 68f2b81..5beaaae 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java @@ -23,6 +23,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; import java.util.Map; +import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.service.rpc.thrift.TSIService; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; @@ -50,7 +51,7 @@ public class IoTDBQueryResultSet extends AbstractIoTDBResultSet { protected boolean fetchResults() throws SQLException { try { return abstractIoTDBDataSet.fetchResults(); - } catch (StatementExecutionException e) { + } catch (StatementExecutionException | IoTDBConnectionException e) { throw new SQLException(e.getMessage()); } } diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AbstractIoTDBDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AbstractIoTDBDataSet.java index 233c412..9a585b6 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AbstractIoTDBDataSet.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AbstractIoTDBDataSet.java @@ -147,7 +147,7 @@ public class AbstractIoTDBDataSet { isClosed = true; } - public boolean next() throws StatementExecutionException { + public boolean next() throws StatementExecutionException, IoTDBConnectionException { if (hasCachedResults()) { constructOneRow(); return true; @@ -162,7 +162,7 @@ public class AbstractIoTDBDataSet { return false; } - public boolean fetchResults() throws StatementExecutionException { + public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException { rowsIndex = 0; TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true); try { @@ -176,7 +176,7 @@ public class AbstractIoTDBDataSet { } return resp.hasResultSet; } catch (TException e) { - throw new StatementExecutionException( + throw new IoTDBConnectionException( "Cannot fetch result from server, because of network connection: {} ", e); } } @@ -362,7 +362,7 @@ public class AbstractIoTDBDataSet { return getString(index, columnTypeDeduplicatedList.get(index), values); } - protected String getString(int index, TSDataType tsDataType, byte[][] values) { + public String getString(int index, TSDataType tsDataType, byte[][] values) { switch (tsDataType) { case BOOLEAN: return String.valueOf(BytesUtils.bytesToBool(values[index])); diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java index d11957b..992e0a7 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java @@ -18,238 +18,63 @@ */ package org.apache.iotdb.session; -import java.nio.ByteBuffer; +import static org.apache.iotdb.rpc.AbstractIoTDBDataSet.START_INDEX; + import java.sql.Timestamp; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; +import org.apache.iotdb.rpc.AbstractIoTDBDataSet; import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; import org.apache.iotdb.service.rpc.thrift.TSIService; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; -import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.thrift.TException; public class SessionDataSet { - private static final String TIMESTAMP_STR = "Time"; - private static final int START_INDEX = 2; - private static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL."; - - private boolean hasCachedRecord = false; - // indicate that there is no more data - private boolean emptyResultSet = false; - private String sql; - private long queryId; - private long sessionId; - private TSIService.Iface client; - private int fetchSize = 1024; - private List<String> columnNameList; - protected List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList - // column name -> column location - private Map<String, Integer> columnOrdinalMap; - // column size - int columnSize; - - - private int rowsIndex = 0; // used to record the row index in current TSQueryDataSet - private TSQueryDataSet tsQueryDataSet; - private byte[] currentBitmap; // used to cache the current bitmap for every column - private static final int flag = 0x80; // used to do `or` operation with bitmap to judge whether the value is null - - private byte[] time; // used to cache the current time value - private byte[][] values; // used to cache the current row record value - + private final AbstractIoTDBDataSet abstractIoTDBDataSet; public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList, Map<String, Integer> columnNameIndex, long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet) { - this.sessionId = sessionId; - this.sql = sql; - this.queryId = queryId; - this.client = client; - this.columnNameList = columnNameList; - currentBitmap = new byte[columnNameList.size()]; - columnSize = columnNameList.size(); - - this.columnNameList = new ArrayList<>(); - this.columnNameList.add(TIMESTAMP_STR); - // deduplicate and map - this.columnOrdinalMap = new HashMap<>(); - this.columnOrdinalMap.put(TIMESTAMP_STR, 1); - - - // deduplicate and map - if (columnNameIndex != null) { - this.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size()); - for (int i = 0; i < columnNameIndex.size(); i++) { - columnTypeDeduplicatedList.add(null); - } - for (int i = 0; i < columnNameList.size(); i++) { - String name = columnNameList.get(i); - this.columnNameList.add(name); - if (!columnOrdinalMap.containsKey(name)) { - int index = columnNameIndex.get(name); - columnOrdinalMap.put(name, index + START_INDEX); - columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); - } - } - } else { - this.columnTypeDeduplicatedList = new ArrayList<>(); - int index = START_INDEX; - for (int i = 0; i < columnNameList.size(); i++) { - String name = columnNameList.get(i); - this.columnNameList.add(name); - if (!columnOrdinalMap.containsKey(name)) { - columnOrdinalMap.put(name, index++); - columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i))); - } - } - } - - time = new byte[Long.BYTES]; - values = new byte[columnNameList.size()][]; - this.tsQueryDataSet = queryDataSet; + this.abstractIoTDBDataSet = new AbstractIoTDBDataSet(sql, columnNameList, columnTypeList, columnNameIndex, false, queryId, client, sessionId, queryDataSet, 1024); } public int getFetchSize() { - return fetchSize; + return abstractIoTDBDataSet.fetchSize; } public void setFetchSize(int fetchSize) { - this.fetchSize = fetchSize; + abstractIoTDBDataSet.fetchSize = fetchSize; } public List<String> getColumnNames() { - return columnNameList; - } - - - private boolean fetchResults() throws IoTDBConnectionException, StatementExecutionException { - rowsIndex = 0; - TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true); - try { - TSFetchResultsResp resp = client.fetchResults(req); - - RpcUtils.verifySuccess(resp.getStatus()); - if (!resp.hasResultSet) { - emptyResultSet = true; - } else { - tsQueryDataSet = resp.getQueryDataSet(); - } - return resp.hasResultSet; - } catch (TException e) { - throw new IoTDBConnectionException( - "Cannot fetch result from server, because of network connection: {} ", e); - } + return abstractIoTDBDataSet.columnNameList; } - private boolean hasCachedResults() { - return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining()); - } public boolean hasNext() throws StatementExecutionException, IoTDBConnectionException { - - if (hasCachedRecord) { - return true; - } - - if (hasCachedResults()) { - constructOneRow(); - return true; - } - if (emptyResultSet) { - return false; - } - if (fetchResults()) { - constructOneRow(); - return true; - } - return false; - } - - private void constructOneRow() { - tsQueryDataSet.time.get(time); - for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) { - ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i); - // another new 8 row, should move the bitmap buffer position to next byte - if (rowsIndex % 8 == 0) { - currentBitmap[i] = bitmapBuffer.get(); - } - values[i] = null; - if (!isNull(i, rowsIndex)) { - ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i); - TSDataType dataType = columnTypeDeduplicatedList.get(i); - switch (dataType) { - case BOOLEAN: - if (values[i] == null) { - values[i] = new byte[1]; - } - valueBuffer.get(values[i]); - break; - case INT32: - if (values[i] == null) { - values[i] = new byte[Integer.BYTES]; - } - valueBuffer.get(values[i]); - break; - case INT64: - if (values[i] == null) { - values[i] = new byte[Long.BYTES]; - } - valueBuffer.get(values[i]); - break; - case FLOAT: - if (values[i] == null) { - values[i] = new byte[Float.BYTES]; - } - valueBuffer.get(values[i]); - break; - case DOUBLE: - if (values[i] == null) { - values[i] = new byte[Double.BYTES]; - } - valueBuffer.get(values[i]); - break; - case TEXT: - int length = valueBuffer.getInt(); - values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length); - break; - default: - throw new UnSupportedDataTypeException( - String - .format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i))); - } - } - } - rowsIndex++; - hasCachedRecord = true; + return abstractIoTDBDataSet.next(); } private RowRecord constructRowRecordFromValueArray() { List<Field> outFields = new ArrayList<>(); - for (int i = 0; i < columnSize; i++) { + for (int i = 0; i < abstractIoTDBDataSet.columnSize; i++) { Field field; - int loc = columnOrdinalMap.get(columnNameList.get(i + 1)) - START_INDEX; - byte[] valueBytes = values[loc]; + int loc = abstractIoTDBDataSet.columnOrdinalMap.get(abstractIoTDBDataSet.columnNameList.get(i + 1)) - START_INDEX; + byte[] valueBytes = abstractIoTDBDataSet.values[loc]; if (valueBytes != null) { - TSDataType dataType = columnTypeDeduplicatedList.get(loc); + TSDataType dataType = abstractIoTDBDataSet.columnTypeDeduplicatedList.get(loc); field = new Field(dataType); switch (dataType) { case BOOLEAN: @@ -277,47 +102,33 @@ public class SessionDataSet { break; default: throw new UnSupportedDataTypeException(String - .format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i))); + .format("Data type %s is not supported.", abstractIoTDBDataSet.columnTypeDeduplicatedList.get(i))); } } else { field = new Field(null); } outFields.add(field); } - return new RowRecord(BytesUtils.bytesToLong(time), outFields); + return new RowRecord(BytesUtils.bytesToLong(abstractIoTDBDataSet.time), outFields); } - /** - * judge whether the specified column value is null in the current position - * - * @param index column index - */ - private boolean isNull(int index, int rowNum) { - byte bitmap = currentBitmap[index]; - int shift = rowNum % 8; - return ((flag >>> shift) & bitmap) == 0; - } public RowRecord next() throws StatementExecutionException, IoTDBConnectionException { - if (!hasCachedRecord) { + if (!abstractIoTDBDataSet.hasCachedRecord) { if (!hasNext()) { return null; } } - hasCachedRecord = false; + abstractIoTDBDataSet.hasCachedRecord = false; return constructRowRecordFromValueArray(); } public void closeOperationHandle() throws StatementExecutionException, IoTDBConnectionException { try { - TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId); - closeReq.setQueryId(queryId); - TSStatus closeResp = client.closeOperation(closeReq); - RpcUtils.verifySuccess(closeResp); + abstractIoTDBDataSet.close(); } catch (TException e) { - throw new IoTDBConnectionException( - "Error occurs when connecting to server for close operation, because: " + e, e); + throw new IoTDBConnectionException(e.getMessage()); } } @@ -328,167 +139,75 @@ public class SessionDataSet { public class DataIterator { public boolean next() throws StatementExecutionException, IoTDBConnectionException { - if (hasCachedResults()) { - constructOneRow(); - return true; - } - if (emptyResultSet) { - return false; - } - if (fetchResults()) { - constructOneRow(); - return true; - } - return false; + return abstractIoTDBDataSet.next(); } public boolean getBoolean(int columnIndex) throws StatementExecutionException { - return getBoolean(findColumnNameByIndex(columnIndex)); + return abstractIoTDBDataSet.getBoolean(columnIndex); } public boolean getBoolean(String columnName) throws StatementExecutionException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToBool(values[index]); - } else { - throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); - } + return abstractIoTDBDataSet.getBoolean(columnName); } public double getDouble(int columnIndex) throws StatementExecutionException { - return getDouble(findColumnNameByIndex(columnIndex)); + return abstractIoTDBDataSet.getDouble(columnIndex); } public double getDouble(String columnName) throws StatementExecutionException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToDouble(values[index]); - } else { - throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); - } + return abstractIoTDBDataSet.getDouble(columnName); } public float getFloat(int columnIndex) throws StatementExecutionException { - return getFloat(findColumnNameByIndex(columnIndex)); + return abstractIoTDBDataSet.getFloat(columnIndex); } public float getFloat(String columnName) throws StatementExecutionException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToFloat(values[index]); - } else { - throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); - } + return abstractIoTDBDataSet.getFloat(columnName); } public int getInt(int columnIndex) throws StatementExecutionException { - return getInt(findColumnNameByIndex(columnIndex)); + return abstractIoTDBDataSet.getInt(columnIndex); } public int getInt(String columnName) throws StatementExecutionException { - checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToInt(values[index]); - } else { - throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); - } + return abstractIoTDBDataSet.getInt(columnName); } public long getLong(int columnIndex) throws StatementExecutionException { - return getLong(findColumnNameByIndex(columnIndex)); + return abstractIoTDBDataSet.getLong(columnIndex); } public long getLong(String columnName) throws StatementExecutionException { - checkRecord(); - if (columnName.equals(TIMESTAMP_STR)) { - return BytesUtils.bytesToLong(time); - } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (values[index] != null) { - return BytesUtils.bytesToLong(values[index]); - } else { - throw new StatementExecutionException(String.format(VALUE_IS_NULL, columnName)); - } + return abstractIoTDBDataSet.getLong(columnName); } public Object getObject(int columnIndex) throws StatementExecutionException { - return getObject(findColumnNameByIndex(columnIndex)); + return abstractIoTDBDataSet.getObject(columnIndex); } public Object getObject(String columnName) throws StatementExecutionException { - return getValueByName(columnName); + return abstractIoTDBDataSet.getObject(columnName); } public String getString(int columnIndex) throws StatementExecutionException { - return getString(findColumnNameByIndex(columnIndex)); + return abstractIoTDBDataSet.getString(columnIndex); } public String getString(String columnName) throws StatementExecutionException { - return getValueByName(columnName); + return abstractIoTDBDataSet.getString(columnName); } public Timestamp getTimestamp(int columnIndex) throws StatementExecutionException { - return new Timestamp(getLong(columnIndex)); + return abstractIoTDBDataSet.getTimestamp(columnIndex); } public Timestamp getTimestamp(String columnName) throws StatementExecutionException { - return getTimestamp(findColumn(columnName)); + return abstractIoTDBDataSet.getTimestamp(columnName); } public int findColumn(String columnName) { - return columnOrdinalMap.get(columnName); - } - - private String getValueByName(String columnName) throws StatementExecutionException { - checkRecord(); - if (columnName.equals(TIMESTAMP_STR)) { - return String.valueOf(BytesUtils.bytesToLong(time)); - } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 || index >= values.length || values[index] == null) { - return null; - } - return getString(index, columnTypeDeduplicatedList.get(index), values); - } - - protected String getString(int index, TSDataType tsDataType, byte[][] values) { - switch (tsDataType) { - case BOOLEAN: - return String.valueOf(BytesUtils.bytesToBool(values[index])); - case INT32: - return String.valueOf(BytesUtils.bytesToInt(values[index])); - case INT64: - return String.valueOf(BytesUtils.bytesToLong(values[index])); - case FLOAT: - return String.valueOf(BytesUtils.bytesToFloat(values[index])); - case DOUBLE: - return String.valueOf(BytesUtils.bytesToDouble(values[index])); - case TEXT: - return new String(values[index]); - default: - return null; - } - } - - private void checkRecord() throws StatementExecutionException { - if (Objects.isNull(tsQueryDataSet)) { - throw new StatementExecutionException("No record remains"); - } - } - } - - private String findColumnNameByIndex(int columnIndex) throws StatementExecutionException { - if (columnIndex <= 0) { - throw new StatementExecutionException("column index should start from 1"); - } - if (columnIndex > columnNameList.size()) { - throw new StatementExecutionException( - String.format("column index %d out of range %d", columnIndex, columnNameList.size())); + return abstractIoTDBDataSet.findColumn(columnName); } - return columnNameList.get(columnIndex - 1); } }
