This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/PrintTimeColumn in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 63eea2dd10e85de05c16990d65fe492651a74051 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jul 16 20:07:41 2024 +0800 Support print time column in table model --- .../org/apache/iotdb/isession/SessionDataSet.java | 82 ++---- .../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java | 4 - .../org/apache/iotdb/jdbc/IoTDBConnection.java | 8 + .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 81 ++---- .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java | 323 --------------------- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 65 ++--- .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 281 ++++++++---------- .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 43 +++ .../apache/iotdb/session/SessionConnection.java | 25 +- .../org/apache/iotdb/session/ThriftConnection.java | 8 +- .../apache/iotdb/session/pool/SessionPoolTest.java | 8 +- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 10 +- .../queryengine/common/header/DatasetHeader.java | 6 + .../plan/relational/planner/LogicalPlanner.java | 7 +- .../distribute/TableDistributionPlanner.java | 33 ++- .../thrift-datanode/src/main/thrift/client.thrift | 1 + 16 files changed, 305 insertions(+), 680 deletions(-) diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java index 4b3d837c07e..ce46e25e0bd 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java @@ -38,8 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import static org.apache.iotdb.rpc.IoTDBRpcDataSet.START_INDEX; - public class SessionDataSet implements ISessionDataSet { private final IoTDBRpcDataSet ioTDBRpcDataSet; @@ -57,7 +55,9 @@ public class SessionDataSet implements ISessionDataSet { List<ByteBuffer> queryResult, boolean ignoreTimeStamp, boolean moreData, - ZoneId zoneId) { + ZoneId zoneId, + int timeFactor, + boolean tableModel) { this.ioTDBRpcDataSet = new IoTDBRpcDataSet( sql, @@ -74,41 +74,9 @@ public class SessionDataSet implements ISessionDataSet { SessionConfig.DEFAULT_FETCH_SIZE, 0, zoneId, - RpcUtils.DEFAULT_TIME_FORMAT); - } - - @SuppressWarnings("squid:S107") // ignore Methods should not have too many parameters - public SessionDataSet( - String sql, - List<String> columnNameList, - List<String> columnTypeList, - Map<String, Integer> columnNameIndex, - long queryId, - long statementId, - IClientRPCService.Iface client, - long sessionId, - List<ByteBuffer> queryResult, - boolean ignoreTimeStamp, - long timeout, - boolean moreData, - ZoneId zoneId) { - this.ioTDBRpcDataSet = - new IoTDBRpcDataSet( - sql, - columnNameList, - columnTypeList, - columnNameIndex, - ignoreTimeStamp, - moreData, - queryId, - statementId, - client, - sessionId, - queryResult, - SessionConfig.DEFAULT_FETCH_SIZE, - timeout, - zoneId, - RpcUtils.DEFAULT_TIME_FORMAT); + RpcUtils.DEFAULT_TIME_FORMAT, + timeFactor, + tableModel); } @SuppressWarnings("squid:S107") // ignore Methods should not have too many parameters @@ -126,7 +94,9 @@ public class SessionDataSet implements ISessionDataSet { long timeout, boolean moreData, int fetchSize, - ZoneId zoneId) { + ZoneId zoneId, + int timeFactor, + boolean tableModel) { this.ioTDBRpcDataSet = new IoTDBRpcDataSet( sql, @@ -143,7 +113,9 @@ public class SessionDataSet implements ISessionDataSet { fetchSize, timeout, zoneId, - RpcUtils.DEFAULT_TIME_FORMAT); + RpcUtils.DEFAULT_TIME_FORMAT, + timeFactor, + tableModel); } public int getFetchSize() { @@ -178,52 +150,42 @@ public class SessionDataSet implements ISessionDataSet { for (int i = 0; i < ioTDBRpcDataSet.columnSize; i++) { Field field; - int index = i + 1; - int datasetColumnIndex = i + START_INDEX; - if (ioTDBRpcDataSet.ignoreTimeStamp) { - index--; - datasetColumnIndex--; - } - int loc = - ioTDBRpcDataSet.columnOrdinalMap.get(ioTDBRpcDataSet.columnNameList.get(index)) - - START_INDEX; + String columnName = ioTDBRpcDataSet.columnNameList.get(i); - if (!ioTDBRpcDataSet.isNull(datasetColumnIndex)) { - TSDataType dataType = ioTDBRpcDataSet.columnTypeDeduplicatedList.get(loc); + if (!ioTDBRpcDataSet.isNull(columnName)) { + TSDataType dataType = ioTDBRpcDataSet.getDataType(columnName); field = new Field(dataType); switch (dataType) { case BOOLEAN: - boolean booleanValue = ioTDBRpcDataSet.getBoolean(datasetColumnIndex); + boolean booleanValue = ioTDBRpcDataSet.getBoolean(columnName); field.setBoolV(booleanValue); break; case INT32: case DATE: - int intValue = ioTDBRpcDataSet.getInt(datasetColumnIndex); + int intValue = ioTDBRpcDataSet.getInt(columnName); field.setIntV(intValue); break; case INT64: case TIMESTAMP: - long longValue = ioTDBRpcDataSet.getLong(datasetColumnIndex); + long longValue = ioTDBRpcDataSet.getLong(columnName); field.setLongV(longValue); break; case FLOAT: - float floatValue = ioTDBRpcDataSet.getFloat(datasetColumnIndex); + float floatValue = ioTDBRpcDataSet.getFloat(columnName); field.setFloatV(floatValue); break; case DOUBLE: - double doubleValue = ioTDBRpcDataSet.getDouble(datasetColumnIndex); + double doubleValue = ioTDBRpcDataSet.getDouble(columnName); field.setDoubleV(doubleValue); break; case TEXT: case BLOB: case STRING: - field.setBinaryV(ioTDBRpcDataSet.getBinary(datasetColumnIndex)); + field.setBinaryV(ioTDBRpcDataSet.getBinary(columnName)); break; default: throw new UnSupportedDataTypeException( - String.format( - "Data type %s is not supported.", - ioTDBRpcDataSet.columnTypeDeduplicatedList.get(i))); + String.format("Data type %s is not supported.", dataType)); } } else { field = new Field(null); diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java index 55d7b0bbf55..54b1c0d4fb0 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java @@ -454,10 +454,6 @@ public abstract class AbstractIoTDBJDBCResultSet implements ResultSet { if (statement.getResultSet() instanceof IoTDBJDBCResultSet) { operationType = ((IoTDBJDBCResultSet) statement.getResultSet()).getOperationType(); this.sgColumns = ((IoTDBJDBCResultSet) statement.getResultSet()).getSgColumns(); - } else if (statement.getResultSet() instanceof IoTDBNonAlignJDBCResultSet) { - operationType = ((IoTDBNonAlignJDBCResultSet) statement.getResultSet()).getOperationType(); - this.sgColumns = ((IoTDBNonAlignJDBCResultSet) statement.getResultSet()).getSgColumns(); - nonAlign = true; } } catch (SQLException throwables) { LOGGER.error("Get meta data error: {}", throwables.getMessage()); diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java index 721a1217ca2..e33bde2c160 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java @@ -101,6 +101,9 @@ public class IoTDBConnection implements Connection { private String userName; + // ms is 1_000, us is 1_000_000, ns is 1_000_000_000 + private int timeFactor = 1_000; + public IoTDBConnection() { // allowed to create an instance without parameter input. } @@ -508,6 +511,7 @@ public class IoTDBConnection implements Connection { // validate connection RpcUtils.verifySuccess(openResp.getStatus()); + this.timeFactor = RpcUtils.getTimeFactor(openResp); if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) { logger.warn( "Protocol differ, Client version is {}}, but Server version is {}", @@ -598,4 +602,8 @@ public class IoTDBConnection implements Connection { protected void changeDefaultDatabase(String database) { params.setDb(database); } + + public int getTimeFactor() { + return timeFactor; + } } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java index e5c0e60e841..19e8004a33f 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java @@ -57,16 +57,17 @@ import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.time.ZoneId; -import java.util.BitSet; import java.util.Calendar; import java.util.List; import java.util.Map; +import static org.apache.iotdb.rpc.RpcUtils.convertToTimestamp; + public class IoTDBJDBCResultSet implements ResultSet { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBJDBCResultSet.class); - protected Statement statement; + protected IoTDBStatement statement; protected SQLWarning warningChain = null; protected List<String> columnTypeList; protected IoTDBRpcDataSet ioTDBRpcDataSet; @@ -79,60 +80,7 @@ public class IoTDBJDBCResultSet implements ResultSet { @SuppressWarnings("squid:S107") // ignore Methods should not have too many parameters public IoTDBJDBCResultSet( - Statement statement, - List<String> columnNameList, - List<String> columnTypeList, - Map<String, Integer> columnNameIndex, - boolean ignoreTimeStamp, - IClientRPCService.Iface client, - String sql, - long queryId, - long sessionId, - List<ByteBuffer> dataset, - TSTracingInfo tracingInfo, - long timeout, - String operationType, - List<String> columns, - List<String> sgColumns, - BitSet aliasColumnMap, - boolean moreData, - ZoneId zoneId, - Charset charset) - throws SQLException { - this.ioTDBRpcDataSet = - new IoTDBRpcDataSet( - sql, - columnNameList, - columnTypeList, - columnNameIndex, - ignoreTimeStamp, - moreData, - queryId, - ((IoTDBStatement) statement).getStmtId(), - client, - sessionId, - dataset, - statement.getFetchSize(), - timeout, - sgColumns, - aliasColumnMap, - zoneId, - timeFormat); - this.statement = statement; - this.columnTypeList = columnTypeList; - if (tracingInfo != null) { - ioTDBRpcTracingInfo = new IoTDBTracingInfo(); - ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo); - } - this.operationType = operationType; - this.columns = columns; - this.sgColumns = sgColumns; - this.charset = charset; - } - - @SuppressWarnings("squid:S107") // ignore Methods should not have too many parameters - public IoTDBJDBCResultSet( - Statement statement, + IoTDBStatement statement, List<String> columnNameList, List<String> columnTypeList, Map<String, Integer> columnNameIndex, @@ -146,7 +94,8 @@ public class IoTDBJDBCResultSet implements ResultSet { long timeout, boolean moreData, ZoneId zoneId, - Charset charset) + Charset charset, + boolean tableModel) throws SQLException { this.ioTDBRpcDataSet = new IoTDBRpcDataSet( @@ -157,14 +106,16 @@ public class IoTDBJDBCResultSet implements ResultSet { ignoreTimeStamp, moreData, queryId, - ((IoTDBStatement) statement).getStmtId(), + statement.getStmtId(), client, sessionId, dataSet, statement.getFetchSize(), timeout, zoneId, - timeFormat); + timeFormat, + statement.getTimeFactor(), + tableModel); this.statement = statement; this.columnTypeList = columnTypeList; if (tracingInfo != null) { @@ -207,8 +158,10 @@ public class IoTDBJDBCResultSet implements ResultSet { statement.getFetchSize(), timeout, zoneId, - timeFormat); - this.statement = statement; + timeFormat, + ((IoTDBStatement) statement).getTimeFactor(), + false); + this.statement = (IoTDBStatement) statement; this.columnTypeList = columnTypeList; if (tracingInfo != null) { ioTDBRpcTracingInfo = new IoTDBTracingInfo(); @@ -722,12 +675,12 @@ public class IoTDBJDBCResultSet implements ResultSet { @Override public Time getTime(int columnIndex) throws SQLException { - return new Time(getLong(columnIndex)); + long time = statement.getMilliSecond(getLong(columnIndex)); + return new Time(time); } @Override public Time getTime(String columnName) throws SQLException { - // TODO: timestamp return getTime(findColumn(columnName)); } @@ -743,7 +696,7 @@ public class IoTDBJDBCResultSet implements ResultSet { @Override public Timestamp getTimestamp(int columnIndex) throws SQLException { - return new Timestamp(getLong(columnIndex)); + return convertToTimestamp(getLong(columnIndex), statement.getTimeFactor()); } @Override diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java deleted file mode 100644 index c6a592c6781..00000000000 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.jdbc; - -import org.apache.iotdb.rpc.RpcUtils; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.service.rpc.thrift.IClientRPCService; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; -import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; -import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet; -import org.apache.iotdb.service.rpc.thrift.TSTracingInfo; - -import org.apache.thrift.TException; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.utils.BytesUtils; -import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.apache.tsfile.write.UnSupportedDataTypeException; - -import java.nio.ByteBuffer; -import java.sql.SQLException; -import java.sql.Statement; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static org.apache.iotdb.rpc.IoTDBRpcDataSet.START_INDEX; -import static org.apache.iotdb.rpc.IoTDBRpcDataSet.TIMESTAMP_STR; - -public class IoTDBNonAlignJDBCResultSet extends AbstractIoTDBJDBCResultSet { - - private static final int TIMESTAMP_STR_LENGTH = 4; - private static final String EMPTY_STR = ""; - private String operationType = ""; - private TSQueryNonAlignDataSet tsQueryNonAlignDataSet; - private byte[][] times; // used for disable align - private List<String> sgColumns = null; - - // for disable align clause - @SuppressWarnings("squid:S107") // ignore Methods should not have too many parameters - IoTDBNonAlignJDBCResultSet( - Statement statement, - List<String> columnNameList, - List<String> columnTypeList, - Map<String, Integer> columnNameIndex, - boolean ignoreTimeStamp, - IClientRPCService.Iface client, - String sql, - long queryId, - long sessionId, - TSQueryNonAlignDataSet dataset, - TSTracingInfo tracingInfo, - long timeout, - String operationType, - List<String> sgColumns, - BitSet aliasColumnMap, - ZoneId zoneId) - throws SQLException { - super( - statement, - columnNameList, - columnTypeList, - columnNameIndex, - ignoreTimeStamp, - client, - sql, - queryId, - sessionId, - timeout, - sgColumns, - aliasColumnMap, - zoneId); - times = new byte[columnNameList.size()][Long.BYTES]; - this.operationType = operationType; - ioTDBRpcDataSet.columnNameList = new ArrayList<>(); - ioTDBRpcDataSet.columnTypeList = new ArrayList<>(); - // deduplicate and map - ioTDBRpcDataSet.columnOrdinalMap = new HashMap<>(); - ioTDBRpcDataSet.columnOrdinalMap.put(TIMESTAMP_STR, 1); - ioTDBRpcDataSet.columnTypeDeduplicatedList = new ArrayList<>(); - ioTDBRpcDataSet.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size()); - for (int i = 0; i < columnNameIndex.size(); i++) { - ioTDBRpcDataSet.columnTypeDeduplicatedList.add(null); - } - List<String> newSgColumns = new ArrayList<>(); - for (int i = 0; i < columnNameList.size(); i++) { - String name = ""; - if (sgColumns != null && !sgColumns.isEmpty()) { - name = sgColumns.get(i) + "." + columnNameList.get(i); - newSgColumns.add(sgColumns.get(i)); - newSgColumns.add(sgColumns.get(i)); - } else { - name = columnNameList.get(i); - newSgColumns.add(""); - newSgColumns.add(""); - } - ioTDBRpcDataSet.columnNameList.add(TIMESTAMP_STR + name); - ioTDBRpcDataSet.columnNameList.add(name); - ioTDBRpcDataSet.columnTypeList.add(String.valueOf(TSDataType.INT64)); - ioTDBRpcDataSet.columnTypeList.add(columnTypeList.get(i)); - if (!ioTDBRpcDataSet.columnOrdinalMap.containsKey(name)) { - int index = columnNameIndex.get(name); - ioTDBRpcDataSet.columnOrdinalMap.put(name, index + START_INDEX); - ioTDBRpcDataSet.columnTypeDeduplicatedList.set( - index, TSDataType.valueOf(columnTypeList.get(i))); - } - } - this.sgColumns = newSgColumns; - this.tsQueryNonAlignDataSet = dataset; - if (tracingInfo != null) { - ioTDBRpcTracingInfo = new IoTDBTracingInfo(); - ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo); - } - } - - @Override - public long getLong(String columnName) throws SQLException { - checkRecord(); - if (columnName.startsWith(TIMESTAMP_STR)) { - String column = columnName.substring(TIMESTAMP_STR_LENGTH); - int index = ioTDBRpcDataSet.columnOrdinalMap.get(column) - START_INDEX; - if (times[index] != null) { - ioTDBRpcDataSet.lastReadWasNull = false; - return BytesUtils.bytesToLong(times[index]); - } else { - ioTDBRpcDataSet.lastReadWasNull = true; - return 0; - } - } - int index = ioTDBRpcDataSet.columnOrdinalMap.get(columnName) - START_INDEX; - if (ioTDBRpcDataSet.values[index] != null) { - ioTDBRpcDataSet.lastReadWasNull = false; - return BytesUtils.bytesToLong(ioTDBRpcDataSet.values[index]); - } else { - ioTDBRpcDataSet.lastReadWasNull = true; - return 0; - } - } - - @Override - protected boolean fetchResults() throws SQLException { - TSFetchResultsReq req = - new TSFetchResultsReq( - ioTDBRpcDataSet.sessionId, - ioTDBRpcDataSet.sql, - ioTDBRpcDataSet.fetchSize, - ioTDBRpcDataSet.queryId, - false); - req.setTimeout(ioTDBRpcDataSet.timeout); - try { - TSFetchResultsResp resp = ioTDBRpcDataSet.client.fetchResults(req); - - rpcUtilsVerifySuccess(resp); - if (!resp.hasResultSet) { - ioTDBRpcDataSet.emptyResultSet = true; - close(); - } else { - tsQueryNonAlignDataSet = resp.getNonAlignQueryDataSet(); - if (tsQueryNonAlignDataSet == null) { - ioTDBRpcDataSet.emptyResultSet = true; - close(); - return false; - } - } - return resp.hasResultSet; - } catch (TException e) { - throw new SQLException( - "Cannot fetch result from server, because of network connection: {} ", e); - } - } - - private static void rpcUtilsVerifySuccess(TSFetchResultsResp resp) throws IoTDBSQLException { - try { - RpcUtils.verifySuccess(resp.getStatus()); - } catch (StatementExecutionException e) { - throw new IoTDBSQLException(e.getMessage(), resp.getStatus()); - } - } - - @Override - protected boolean hasCachedResults() { - return (tsQueryNonAlignDataSet != null && hasTimesRemaining()); - } - - // check if has times remaining for disable align clause - private boolean hasTimesRemaining() { - for (ByteBuffer time : tsQueryNonAlignDataSet.timeList) { - if (time.hasRemaining()) { - return true; - } - } - return false; - } - - @Override - protected void constructOneRow() { - ioTDBRpcDataSet.lastReadWasNull = false; - for (int i = 0; i < tsQueryNonAlignDataSet.timeList.size(); i++) { - times[i] = null; - ioTDBRpcDataSet.values[i] = null; - if (tsQueryNonAlignDataSet.timeList.get(i).remaining() >= Long.BYTES) { - - times[i] = new byte[Long.BYTES]; - - tsQueryNonAlignDataSet.timeList.get(i).get(times[i]); - ByteBuffer valueBuffer = tsQueryNonAlignDataSet.valueList.get(i); - TSDataType dataType = ioTDBRpcDataSet.columnTypeDeduplicatedList.get(i); - switch (dataType) { - case BOOLEAN: - ioTDBRpcDataSet.values[i] = new byte[1]; - valueBuffer.get(ioTDBRpcDataSet.values[i]); - break; - case INT32: - ioTDBRpcDataSet.values[i] = new byte[Integer.BYTES]; - valueBuffer.get(ioTDBRpcDataSet.values[i]); - break; - case INT64: - ioTDBRpcDataSet.values[i] = new byte[Long.BYTES]; - valueBuffer.get(ioTDBRpcDataSet.values[i]); - break; - case FLOAT: - ioTDBRpcDataSet.values[i] = new byte[Float.BYTES]; - valueBuffer.get(ioTDBRpcDataSet.values[i]); - break; - case DOUBLE: - ioTDBRpcDataSet.values[i] = new byte[Double.BYTES]; - valueBuffer.get(ioTDBRpcDataSet.values[i]); - break; - case TEXT: - int length = valueBuffer.getInt(); - ioTDBRpcDataSet.values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length); - break; - default: - throw new UnSupportedDataTypeException( - String.format( - "Data type %s is not supported.", - ioTDBRpcDataSet.columnTypeDeduplicatedList.get(i))); - } - } else { - ioTDBRpcDataSet.values[i] = EMPTY_STR.getBytes(); - } - } - } - - @Override - protected void checkRecord() throws SQLException { - if (Objects.isNull(tsQueryNonAlignDataSet)) { - throw new SQLException("No record remains"); - } - } - - @Override - protected String getValueByName(String columnName) throws SQLException { - checkRecord(); - if (columnName.startsWith(TIMESTAMP_STR)) { - String column = columnName.substring(TIMESTAMP_STR_LENGTH); - int index = ioTDBRpcDataSet.columnOrdinalMap.get(column) - START_INDEX; - if (times[index] == null || times[index].length == 0) { - return null; - } - return String.valueOf(BytesUtils.bytesToLong(times[index])); - } - int index = ioTDBRpcDataSet.columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 - || index >= ioTDBRpcDataSet.values.length - || ioTDBRpcDataSet.values[index] == null - || ioTDBRpcDataSet.values[index].length < 1) { - return null; - } - return ioTDBRpcDataSet.getString( - index, ioTDBRpcDataSet.columnTypeDeduplicatedList.get(index), ioTDBRpcDataSet.values); - } - - @Override - protected Object getObjectByName(String columnName) throws SQLException { - checkRecord(); - if (columnName.startsWith(TIMESTAMP_STR)) { - String column = columnName.substring(TIMESTAMP_STR_LENGTH); - int index = ioTDBRpcDataSet.columnOrdinalMap.get(column) - START_INDEX; - if (times[index] == null || times[index].length == 0) { - return null; - } - return BytesUtils.bytesToLong(times[index]); - } - int index = ioTDBRpcDataSet.columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 - || index >= ioTDBRpcDataSet.values.length - || ioTDBRpcDataSet.values[index] == null - || ioTDBRpcDataSet.values[index].length < 1) { - return null; - } - return ioTDBRpcDataSet.getObject( - index, ioTDBRpcDataSet.columnTypeDeduplicatedList.get(index), ioTDBRpcDataSet.values); - } - - public String getOperationType() { - return this.operationType; - } - - public List<String> getSgColumns() { - return sgColumns; - } -} diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java index 4dd1d505587..88cbb1fcfa0 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java @@ -328,25 +328,7 @@ public class IoTDBStatement implements Statement { if (execResp.isSetColumns()) { queryId = execResp.getQueryId(); if (execResp.queryResult == null) { - BitSet aliasColumn = listToBitSet(execResp.getAliasColumns()); - this.resultSet = - new IoTDBNonAlignJDBCResultSet( - this, - execResp.getColumns(), - execResp.getDataTypeList(), - execResp.columnNameIndexMap, - execResp.ignoreTimeStamp, - client, - sql, - queryId, - sessionId, - execResp.nonAlignQueryDataSet, - execResp.tracingInfo, - execReq.timeout, - execResp.operationType, - execResp.getSgColumns(), - aliasColumn, - zoneId); + throw new SQLException("execResp.queryResult should never be null."); } else { this.resultSet = new IoTDBJDBCResultSet( @@ -364,7 +346,8 @@ public class IoTDBStatement implements Statement { execReq.timeout, execResp.moreData, zoneId, - charset); + charset, + execResp.isSetTableModel() && execResp.isTableModel()); } return true; } @@ -476,29 +459,8 @@ public class IoTDBStatement implements Statement { throw new IoTDBSQLException(e.getMessage(), execResp.getStatus()); } - BitSet aliasColumn = null; - if (execResp.getAliasColumns() != null && !execResp.getAliasColumns().isEmpty()) { - aliasColumn = listToBitSet(execResp.getAliasColumns()); - } if (execResp.queryResult == null) { - this.resultSet = - new IoTDBNonAlignJDBCResultSet( - this, - execResp.getColumns(), - execResp.getDataTypeList(), - execResp.columnNameIndexMap, - execResp.ignoreTimeStamp, - client, - sql, - queryId, - sessionId, - execResp.nonAlignQueryDataSet, - execResp.tracingInfo, - execReq.timeout, - execResp.operationType, - execResp.sgColumns, - aliasColumn, - zoneId); + throw new SQLException("execResp.queryResult should never be null."); } else { this.resultSet = new IoTDBJDBCResultSet( @@ -514,13 +476,10 @@ public class IoTDBStatement implements Statement { execResp.queryResult, execResp.tracingInfo, execReq.timeout, - execResp.operationType, - execResp.columns, - execResp.sgColumns, - aliasColumn, execResp.moreData, zoneId, - charset); + charset, + execResp.isSetTableModel() && execResp.isTableModel()); } return resultSet; } @@ -782,4 +741,16 @@ public class IoTDBStatement implements Statement { public long getStmtId() { return stmtId; } + + public long getMilliSecond(long time) { + return RpcUtils.getMilliSecond(time, connection.getTimeFactor()); + } + + public int getNanoSecond(long time) { + return RpcUtils.getNanoSecond(time, connection.getTimeFactor()); + } + + public int getTimeFactor() { + return connection.getTimeFactor(); + } } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java index 3e405c95d82..39391d44b74 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java @@ -38,24 +38,27 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.time.ZoneId; import java.util.ArrayList; -import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.iotdb.rpc.RpcUtils.convertToTimestamp; + public class IoTDBRpcDataSet { public static final String TIMESTAMP_STR = "Time"; - public static final int START_INDEX = 2; public String sql; public boolean isClosed = false; public IClientRPCService.Iface client; public List<String> columnNameList; // no deduplication public List<String> columnTypeList; // no deduplication - public Map<String, Integer> + private final Map<String, Integer> columnOrdinalMap; // used because the server returns deduplicated columns - public List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList + // record original time column index, like select s1, time, s2 from table, timeOriginColumnIndex + // will be 2 + private int timeOriginColumnIndex = 1; + private List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList public int fetchSize; public final long timeout; public boolean hasCachedRecord = false; @@ -82,6 +85,11 @@ public class IoTDBRpcDataSet { private final ZoneId zoneId; private final String timeFormat; + private final int timeFactor; + + // 2 for tree model and 1 for table model + private final int startIndex; + @SuppressWarnings({"squid:S3776", "squid:S107"}) // Suppress high Cognitive Complexity warning public IoTDBRpcDataSet( String sql, @@ -98,7 +106,10 @@ public class IoTDBRpcDataSet { int fetchSize, long timeout, ZoneId zoneId, - String timeFormat) { + String timeFormat, + int timeFactor, + boolean tableModel) { + this.startIndex = tableModel ? 1 : 2; this.sessionId = sessionId; this.statementId = statementId; this.ignoreTimeStamp = ignoreTimeStamp; @@ -135,15 +146,20 @@ public class IoTDBRpcDataSet { this.columnTypeList.add(columnTypeList.get(i)); if (!columnOrdinalMap.containsKey(name)) { int index = columnNameIndex.get(name); - if (!columnOrdinalMap.containsValue(index + START_INDEX)) { - columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); + if (index >= 0) { + if (columnTypeDeduplicatedList.get(index) != null) { + columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); + } + } else { + // -1 for Time Column + timeOriginColumnIndex = i + 1; } - columnOrdinalMap.put(name, index + START_INDEX); + columnOrdinalMap.put(name, index + startIndex); } } } else { this.columnTypeDeduplicatedList = new ArrayList<>(); - AtomicInteger index = new AtomicInteger(START_INDEX); + AtomicInteger index = new AtomicInteger(startIndex); for (int i = 0; i < columnNameList.size(); i++) { String name = columnNameList.get(i); this.columnNameList.add(name); @@ -164,6 +180,7 @@ public class IoTDBRpcDataSet { this.tsBlockIndex = -1; this.zoneId = zoneId; this.timeFormat = timeFormat; + this.timeFactor = timeFactor; } public Integer addColumnTypeListReturnIndex(AtomicInteger index, TSDataType dataType) { @@ -171,105 +188,6 @@ public class IoTDBRpcDataSet { return index.getAndIncrement(); } - @SuppressWarnings({ - "squid:S3776", - "squid:S107" - }) // ignore Cognitive Complexity of methods should not be too high - public IoTDBRpcDataSet( - String sql, - List<String> columnNameList, - List<String> columnTypeList, - Map<String, Integer> columnNameIndex, - boolean ignoreTimeStamp, - boolean moreData, - long queryId, - long statementId, - IClientRPCService.Iface client, - long sessionId, - List<ByteBuffer> queryResult, - int fetchSize, - long timeout, - List<String> sgList, - BitSet aliasColumnMap, - ZoneId zoneId, - String timeFormat) { - this.sessionId = sessionId; - this.statementId = statementId; - this.ignoreTimeStamp = ignoreTimeStamp; - this.sql = sql; - this.queryId = queryId; - this.client = client; - this.fetchSize = fetchSize; - this.timeout = timeout; - this.moreData = moreData; - columnSize = columnNameList.size(); - - this.columnNameList = new ArrayList<>(); - this.columnTypeList = new ArrayList<>(); - if (!ignoreTimeStamp) { - this.columnNameList.add(TIMESTAMP_STR); - this.columnTypeList.add(String.valueOf(TSDataType.INT64)); - } - // deduplicate and map - this.columnOrdinalMap = new HashMap<>(); - if (!ignoreTimeStamp) { - this.columnOrdinalMap.put(TIMESTAMP_STR, 1); - } - - // deduplicate and map - if (columnNameIndex != null) { - int deduplicatedColumnSize = (int) columnNameIndex.values().stream().distinct().count(); - this.columnTypeDeduplicatedList = new ArrayList<>(deduplicatedColumnSize); - for (int i = 0; i < deduplicatedColumnSize; i++) { - columnTypeDeduplicatedList.add(null); - } - for (int i = 0; i < columnNameList.size(); i++) { - String name; - if (sgList != null - && !sgList.isEmpty() - && (aliasColumnMap == null || !aliasColumnMap.get(i))) { - name = sgList.get(i) + "." + columnNameList.get(i); - } else { - name = columnNameList.get(i); - } - - this.columnNameList.add(name); - this.columnTypeList.add(columnTypeList.get(i)); - // "Time".equals(name) -> to allow the Time column appear in value columns - if (!columnOrdinalMap.containsKey(name) || "Time".equals(name)) { - int index = columnNameIndex.get(name); - if (!columnOrdinalMap.containsValue(index + START_INDEX)) { - columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); - } - columnOrdinalMap.put(name, index + START_INDEX); - } - } - } else { - this.columnTypeDeduplicatedList = new ArrayList<>(); - int index = START_INDEX; - for (int i = 0; i < columnNameList.size(); i++) { - String name = columnNameList.get(i); - this.columnNameList.add(name); - this.columnTypeList.add(columnTypeList.get(i)); - if (!columnOrdinalMap.containsKey(name)) { - columnOrdinalMap.put(name, index++); - columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i))); - } - } - } - - this.queryResult = queryResult; - this.queryResultSize = 0; - if (queryResult != null) { - this.queryResultSize = queryResult.size(); - } - this.queryResultIndex = 0; - this.tsBlockSize = 0; - this.tsBlockIndex = -1; - this.zoneId = zoneId; - this.timeFormat = timeFormat; - } - public void close() throws StatementExecutionException, TException { if (isClosed) { return; @@ -369,25 +287,16 @@ public class IoTDBRpcDataSet { } public boolean isNull(int columnIndex) throws StatementExecutionException { - int index = columnOrdinalMap.get(findColumnNameByIndex(columnIndex)) - START_INDEX; - // time column will never be null - if (index < 0) { - return true; - } - return isNull(index, tsBlockIndex); + return isNull(findColumnNameByIndex(columnIndex)); } public boolean isNull(String columnName) { - int index = columnOrdinalMap.get(columnName) - START_INDEX; - // time column will never be null - if (index < 0) { - return true; - } - return isNull(index, tsBlockIndex); + return isNull(getTsBlockColumnIndex(columnName), tsBlockIndex); } private boolean isNull(int index, int rowNum) { - return curTsBlock.getColumn(index).isNull(rowNum); + // -1 for time column which will never be null + return index < 0 || curTsBlock.getColumn(index).isNull(rowNum); } public boolean getBoolean(int columnIndex) throws StatementExecutionException { @@ -396,7 +305,7 @@ public class IoTDBRpcDataSet { public boolean getBoolean(String columnName) throws StatementExecutionException { checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; + int index = getTsBlockColumnIndex(columnName); if (!isNull(index, tsBlockIndex)) { lastReadWasNull = false; return curTsBlock.getColumn(index).getBoolean(tsBlockIndex); @@ -412,7 +321,7 @@ public class IoTDBRpcDataSet { public double getDouble(String columnName) throws StatementExecutionException { checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; + int index = getTsBlockColumnIndex(columnName); if (!isNull(index, tsBlockIndex)) { lastReadWasNull = false; return curTsBlock.getColumn(index).getDouble(tsBlockIndex); @@ -428,7 +337,7 @@ public class IoTDBRpcDataSet { public float getFloat(String columnName) throws StatementExecutionException { checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; + int index = getTsBlockColumnIndex(columnName); if (!isNull(index, tsBlockIndex)) { lastReadWasNull = false; return curTsBlock.getColumn(index).getFloat(tsBlockIndex); @@ -444,7 +353,7 @@ public class IoTDBRpcDataSet { public int getInt(String columnName) throws StatementExecutionException { checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; + int index = getTsBlockColumnIndex(columnName); if (!isNull(index, tsBlockIndex)) { lastReadWasNull = false; TSDataType type = curTsBlock.getColumn(index).getDataType(); @@ -465,21 +374,25 @@ public class IoTDBRpcDataSet { public long getLong(String columnName) throws StatementExecutionException { checkRecord(); - if (columnName.equals(TIMESTAMP_STR)) { - return curTsBlock.getTimeByIndex(tsBlockIndex); - } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (!isNull(index, tsBlockIndex)) { + int index = getTsBlockColumnIndex(columnName); + + // take care of time column + if (index < 0) { lastReadWasNull = false; - TSDataType type = curTsBlock.getColumn(index).getDataType(); - if (type == TSDataType.INT32) { - return curTsBlock.getColumn(index).getInt(tsBlockIndex); + return curTsBlock.getTimeByIndex(index); + } else { + if (!isNull(index, tsBlockIndex)) { + lastReadWasNull = false; + TSDataType type = curTsBlock.getColumn(index).getDataType(); + if (type == TSDataType.INT32) { + return curTsBlock.getColumn(index).getInt(tsBlockIndex); + } else { + return curTsBlock.getColumn(index).getLong(tsBlockIndex); + } } else { - return curTsBlock.getColumn(index).getLong(tsBlockIndex); + lastReadWasNull = true; + return 0; } - } else { - lastReadWasNull = true; - return 0; } } @@ -489,7 +402,7 @@ public class IoTDBRpcDataSet { public Binary getBinary(String columnName) throws StatementExecutionException { checkRecord(); - int index = columnOrdinalMap.get(columnName) - START_INDEX; + int index = getTsBlockColumnIndex(columnName); if (!isNull(index, tsBlockIndex)) { lastReadWasNull = false; return curTsBlock.getColumn(index).getBinary(tsBlockIndex); @@ -516,43 +429,46 @@ public class IoTDBRpcDataSet { } public Timestamp getTimestamp(int columnIndex) throws StatementExecutionException { - return new Timestamp(getLong(columnIndex)); + return getTimestamp(findColumnNameByIndex(columnIndex)); } public Timestamp getTimestamp(String columnName) throws StatementExecutionException { - return getTimestamp(findColumn(columnName)); + return convertToTimestamp(getLong(columnName), timeFactor); } public TSDataType getDataType(int columnIndex) throws StatementExecutionException { return getDataType(findColumnNameByIndex(columnIndex)); } - public TSDataType getDataType(String columnName) throws StatementExecutionException { - if (columnName.equals(TIMESTAMP_STR)) { - return TSDataType.INT64; + public TSDataType getDataType(String columnName) { + final int index = getTsBlockColumnIndex(columnName); + if (index == -1) { + return TSDataType.TIMESTAMP; + } else if (index >= 0 && index < columnTypeDeduplicatedList.size()) { + return columnTypeDeduplicatedList.get(index); + } else { + return null; } - final int index = columnOrdinalMap.get(columnName) - START_INDEX; - return index < 0 || index >= columnTypeDeduplicatedList.size() - ? null - : columnTypeDeduplicatedList.get(index); } public int findColumn(String columnName) { - return columnOrdinalMap.get(columnName); + int columnIndex = columnOrdinalMap.get(columnName); + return columnIndex == -1 ? timeOriginColumnIndex : columnIndex; } public String getValueByName(String columnName) throws StatementExecutionException { checkRecord(); - if (columnName.equals(TIMESTAMP_STR)) { + // to keep compatibility, tree model should return a long value for time column + if (startIndex == 2 && columnName.equals(TIMESTAMP_STR)) { return String.valueOf(curTsBlock.getTimeByIndex(tsBlockIndex)); } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 || index >= columnTypeDeduplicatedList.size() || isNull(index, tsBlockIndex)) { + int index = getTsBlockColumnIndex(columnName); + if (isNull(index, tsBlockIndex)) { lastReadWasNull = true; return null; } lastReadWasNull = false; - return getString(index, columnTypeDeduplicatedList.get(index)); + return getString(index, getDataTypeByTsBlockColumnIndex(index)); } public String getString(int index, TSDataType tsDataType) { @@ -562,8 +478,16 @@ public class IoTDBRpcDataSet { case INT32: return String.valueOf(curTsBlock.getColumn(index).getInt(tsBlockIndex)); case INT64: + return String.valueOf( + (index == -1 + ? curTsBlock.getTimeByIndex(tsBlockIndex) + : curTsBlock.getColumn(index).getLong(tsBlockIndex))); case TIMESTAMP: - return String.valueOf(curTsBlock.getColumn(index).getLong(tsBlockIndex)); + long timestamp = + (index == -1 + ? curTsBlock.getTimeByIndex(tsBlockIndex) + : curTsBlock.getColumn(index).getLong(tsBlockIndex)); + return String.valueOf(convertToTimestamp(timestamp, timeFactor)); case FLOAT: return String.valueOf(curTsBlock.getColumn(index).getFloat(tsBlockIndex)); case DOUBLE: @@ -586,16 +510,46 @@ public class IoTDBRpcDataSet { public Object getObjectByName(String columnName) throws StatementExecutionException { checkRecord(); - if (columnName.equals(TIMESTAMP_STR)) { - return new Timestamp(curTsBlock.getTimeByIndex(tsBlockIndex)); - } - int index = columnOrdinalMap.get(columnName) - START_INDEX; - if (index < 0 || index >= columnTypeDeduplicatedList.size() || isNull(index, tsBlockIndex)) { + int index = getTsBlockColumnIndex(columnName); + if (isNull(index, tsBlockIndex)) { lastReadWasNull = true; return null; } lastReadWasNull = false; - return curTsBlock.getColumn(index).getObject(tsBlockIndex); + TSDataType tsDataType = getDataTypeByTsBlockColumnIndex(index); + switch (tsDataType) { + case BOOLEAN: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + return curTsBlock.getColumn(index).getObject(tsBlockIndex); + case TIMESTAMP: + long timestamp = + (index == -1 + ? curTsBlock.getTimeByIndex(tsBlockIndex) + : curTsBlock.getColumn(index).getLong(tsBlockIndex)); + return convertToTimestamp(timestamp, timeFactor); + case TEXT: + case STRING: + return curTsBlock + .getColumn(index) + .getBinary(tsBlockIndex) + .getStringValue(TSFileConfig.STRING_CHARSET); + case BLOB: + return BytesUtils.parseBlobByteArrayToString( + curTsBlock.getColumn(index).getBinary(tsBlockIndex).getValues()); + case DATE: + return DateUtils.formatDate(curTsBlock.getColumn(index).getInt(tsBlockIndex)); + default: + return null; + } + } + + private TSDataType getDataTypeByTsBlockColumnIndex(int tsBlockColumnIndex) { + return tsBlockColumnIndex < 0 + ? TSDataType.TIMESTAMP + : columnTypeDeduplicatedList.get(tsBlockColumnIndex); } public String findColumnNameByIndex(int columnIndex) throws StatementExecutionException { @@ -609,6 +563,15 @@ public class IoTDBRpcDataSet { return columnNameList.get(columnIndex - 1); } + // return -1 for time column + private int getTsBlockColumnIndex(String columnName) { + Integer index = columnOrdinalMap.get(columnName); + if (index == null) { + throw new IllegalArgumentException("Unknown column name :" + columnName); + } + return index - startIndex; + } + public void checkRecord() throws StatementExecutionException { if (queryResultIndex > queryResultSize || tsBlockIndex >= tsBlockSize diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java index 06aac75aa66..fadf2cdadf0 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java @@ -24,11 +24,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Proxy; +import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.time.Instant; import java.time.ZoneId; @@ -63,6 +65,14 @@ public class RpcUtils { public static final long MIN_SHRINK_INTERVAL = 60_000L; + public static final String TIME_PRECISION = "timestamp_precision"; + + public static final String MILLISECOND = "ms"; + + public static final String MICROSECOND = "us"; + + public static final String NANOSECOND = "ns"; + private RpcUtils() { // util class } @@ -346,4 +356,37 @@ public class RpcUtils { public static boolean isUseDatabase(String sql) { return sql.length() > 4 && "use ".equalsIgnoreCase(sql.substring(0, 4)); } + + public static long getMilliSecond(long time, int timeFactor) { + return time / timeFactor * 1_000; + } + + public static int getNanoSecond(long time, int timeFactor) { + return (int) (time % timeFactor * (1_000_000_000 / timeFactor)); + } + + public static Timestamp convertToTimestamp(long time, int timeFactor) { + Timestamp res = new Timestamp(getMilliSecond(time, timeFactor)); + res.setNanos(getNanoSecond(time, timeFactor)); + return res; + } + + public static int getTimeFactor(TSOpenSessionResp openResp) { + if (openResp.isSetConfiguration()) { + String precision = openResp.getConfiguration().get(TIME_PRECISION); + if (precision != null) { + switch (precision) { + case MILLISECOND: + return 1_000; + case MICROSECOND: + return 1_000_000; + case NANOSECOND: + return 1_000_000_000; + default: + throw new IllegalArgumentException("Unknown time precision: " + precision); + } + } + } + return 1_000; + } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 37f9f72f39d..3df2f1a351c 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -109,6 +109,9 @@ public class SessionConnection { private final String database; + // ms is 1_000, us is 1_000_000, ns is 1_000_000_000 + private int timeFactor = 1_000; + // TestOnly public SessionConnection() { availableNodes = Collections::emptyList; @@ -213,7 +216,7 @@ public class SessionConnection { TSOpenSessionResp openResp = client.openSession(openReq); RpcUtils.verifySuccess(openResp.getStatus()); - + this.timeFactor = RpcUtils.getTimeFactor(openResp); if (Session.protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) { logger.warn( "Protocol differ, Client version is {}}, but Server version is {}", @@ -457,7 +460,9 @@ public class SessionConnection { timeout, execResp.moreData, session.fetchSize, - zoneId); + zoneId, + timeFactor, + execResp.isSetTableModel() && execResp.isTableModel()); } protected void executeNonQueryStatement(String sql) @@ -557,7 +562,9 @@ public class SessionConnection { execResp.queryResult, execResp.isIgnoreTimeStamp(), execResp.moreData, - zoneId); + zoneId, + timeFactor, + execResp.isSetTableModel() && execResp.isTableModel()); } protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice( @@ -604,7 +611,9 @@ public class SessionConnection { tsExecuteStatementResp.queryResult, tsExecuteStatementResp.isIgnoreTimeStamp(), tsExecuteStatementResp.moreData, - zoneId), + zoneId, + timeFactor, + tsExecuteStatementResp.isSetTableModel() && tsExecuteStatementResp.isTableModel()), redirectedEndPoint); } @@ -646,7 +655,9 @@ public class SessionConnection { tsExecuteStatementResp.queryResult, tsExecuteStatementResp.isIgnoreTimeStamp(), tsExecuteStatementResp.moreData, - zoneId); + zoneId, + timeFactor, + tsExecuteStatementResp.isSetTableModel() && tsExecuteStatementResp.isTableModel()); } protected SessionDataSet executeAggregationQuery( @@ -728,7 +739,9 @@ public class SessionConnection { tsExecuteStatementResp.queryResult, tsExecuteStatementResp.isIgnoreTimeStamp(), tsExecuteStatementResp.moreData, - zoneId); + zoneId, + timeFactor, + tsExecuteStatementResp.isSetTableModel() && tsExecuteStatementResp.isTableModel()); } private TSAggregationQueryReq createAggregationQueryReq( diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java index 65dc1f3ad2c..7b62754bb3d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/ThriftConnection.java @@ -60,6 +60,8 @@ public class ThriftConnection { protected long statementId; private ZoneId zoneId; + private int timeFactor; + public ThriftConnection( TEndPoint endPoint, int thriftDefaultBufferSize, @@ -124,6 +126,8 @@ public class ThriftConnection { RpcUtils.verifySuccess(openResp.getStatus()); + this.timeFactor = RpcUtils.getTimeFactor(openResp); + if (Session.protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) { LOGGER.warn( "Protocol differ, Client version is {}}, but Server version is {}", @@ -175,7 +179,9 @@ public class ThriftConnection { timeout, execResp.moreData, fetchSize, - zoneId); + zoneId, + timeFactor, + execResp.isSetTableModel() && execResp.isTableModel()); } public void close() { diff --git a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java index f57921be1f9..e4a71a1ef2e 100644 --- a/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java +++ b/iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java @@ -1262,7 +1262,9 @@ public class SessionPoolTest { 10, true, 10, - ZoneId.systemDefault()); + ZoneId.systemDefault(), + 1000, + false); Mockito.when(session.executeQueryStatement(any(String.class), eq(50))) .thenReturn(sessionDataSet); sessionDataSetWrapper = sessionPool.executeQueryStatement(sql, 50); @@ -1497,7 +1499,9 @@ public class SessionPoolTest { 10, true, 10, - ZoneId.systemDefault()); + ZoneId.systemDefault(), + 1000, + false); Mockito.when(session.executeQueryStatement(any(String.class))).thenReturn(sessionDataSet); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 524e918a141..4187aadb8dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -223,6 +223,7 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; +import static org.apache.iotdb.rpc.RpcUtils.TIME_PRECISION; public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @@ -292,6 +293,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = Long.MIN_VALUE; String statement = req.getStatement(); IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + // quota OperationQuota quota = null; if (!SESSION_MANAGER.checkLogin(clientSession)) { @@ -1226,7 +1228,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { clientSession.setDatabaseName(database.get()); } TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); - return resp.setSessionId(openSessionResp.getSessionId()); + Map<String, String> configuration = new HashMap<>(); + configuration.put( + TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + return resp.setSessionId(openSessionResp.getSessionId()).setConfiguration(configuration); } private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) { @@ -2853,6 +2858,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp.setAliasColumns(header.getRespAliasColumns()); resp.setIgnoreTimeStamp(header.isIgnoreTimestamp()); resp.setQueryId(queryId); + resp.setTableModel( + SESSION_MANAGER.getCurrSessionAndUpdateIdleTime().getSqlDialect() + == IClientSession.SqlDialect.TABLE); return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java index 312645b1c03..43f0167eec2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; + /** The header of query result dataset. */ public class DatasetHeader { @@ -158,4 +160,8 @@ public class DatasetHeader { public int hashCode() { return Objects.hash(columnHeaders, isIgnoreTimestamp); } + + public void addTimeColumn() { + columnToTsBlockIndexMap.put(TIMESTAMP_EXPRESSION_STRING, -1); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java index 8d34326e066..d39b1855448 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java @@ -73,7 +73,6 @@ import java.util.List; import java.util.Set; import static java.util.Objects.requireNonNull; -import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; public class LogicalPlanner { @@ -212,9 +211,7 @@ public class LogicalPlanner { Symbol symbol = plan.getSymbol(fieldIndex); outputs.add(symbol); - if (!TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(name)) { - columnHeaders.add(new ColumnHeader(symbol.getName(), getTSDataType(field.getType()))); - } + columnHeaders.add(new ColumnHeader(symbol.getName(), getTSDataType(field.getType()))); columnNumber++; } @@ -223,7 +220,7 @@ public class LogicalPlanner { new OutputNode( context.getQueryId().genPlanNodeId(), plan.getRoot(), names.build(), outputs.build()); - DatasetHeader respDatasetHeader = new DatasetHeader(columnHeaders, false); + DatasetHeader respDatasetHeader = new DatasetHeader(columnHeaders, true); analysis.setRespDatasetHeader(respDatasetHeader); return outputNode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java index b47eb51db94..c1de85b2645 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java @@ -31,11 +31,11 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Lim import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TablePlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; @@ -74,13 +74,30 @@ public class TableDistributionPlanner { new AddExchangeNodes(mppQueryContext) .addExchangeNodes(distributedPlanResult.get(0), planContext); if (analysis.getStatement() instanceof Query) { - analysis - .getRespDatasetHeader() - .setColumnToTsBlockIndexMap( - outputNodeWithExchange.getOutputSymbols().stream() - .map(Symbol::getName) - .filter(e -> !TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(e)) - .collect(Collectors.toList())); + // select s1, time, s2 from table; + // List<String> columnNameList {s1, time, s2} + // Map<String, Integer> columnName2TsBlockColumnIndexMap {s1:0, time: -1, s2: 1} + // resultSet.getString(2); + // => String columnName = columnNameList.get(2 - 1); fetch columnName + // => int indexForTsBlockColumn = columnName2TsBlockColumnIndexMap.get(columnName); + // => Column c = indexForTsBlockColumn >= 0 ? + // tsBlock.getValueColumn(indexForTsBlockColumn) : tsBlock.getTimeColumn() + // => c.getString(currentRowIndex); + List<String> outputColumnNames = + new ArrayList<>(outputNodeWithExchange.getOutputSymbols().size()); + boolean hasTimeColumn = false; + for (Symbol column : outputNodeWithExchange.getOutputSymbols()) { + String columnName = column.getName(); + if (!TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(columnName)) { + outputColumnNames.add(columnName); + } else { + hasTimeColumn = true; + } + } + analysis.getRespDatasetHeader().setColumnToTsBlockIndexMap(outputColumnNames); + if (hasTimeColumn) { + analysis.getRespDatasetHeader().addTimeColumn(); + } } adjustUpStream(outputNodeWithExchange, planContext); diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 9f5c5e7a8ce..ac64873fa4f 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -72,6 +72,7 @@ struct TSExecuteStatementResp { 14: optional bool moreData // only be set while executing use XXX successfully 15: optional string database + 16: optional bool tableModel } enum TSProtocolVersion {
