This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit db93dfb3f79efcc3d7e9ff7303daeb7c85ee03c2 Author: Minghui Liu <[email protected]> AuthorDate: Tue Nov 1 17:13:00 2022 +0800 implement java session --- .../service/thrift/impl/ClientRPCServiceImpl.java | 14 +- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 207 ++------------------- .../java/org/apache/iotdb/session/ISession.java | 10 + .../java/org/apache/iotdb/session/Session.java | 14 ++ .../apache/iotdb/session/SessionConnection.java | 45 +++++ thrift/src/main/thrift/client.thrift | 8 +- 6 files changed, 98 insertions(+), 200 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 24eb5bf432..1d7d875999 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -458,9 +458,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { - TSFetchWindowSetResp resp = createResponse(queryExecution.getDatasetHeader()); - resp.setStatus(result.status); - resp.setQueryDataSetList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution)); + TSFetchWindowSetResp resp = + createTSFetchWindowSetResp(queryExecution.getDatasetHeader(), queryId); + resp.setQueryResultList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution)); return resp; } } catch (Exception e) { @@ -1798,10 +1798,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } - private TSFetchWindowSetResp createResponse(DatasetHeader header) { + private TSFetchWindowSetResp createTSFetchWindowSetResp(DatasetHeader header, long queryId) { TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS); - resp.setColumns(header.getRespColumns()); - resp.setDataTypeList(header.getRespDataTypeList()); + resp.setColumnNameList(header.getRespColumns()); + resp.setColumnTypeList(header.getRespDataTypeList()); + resp.setColumnNameIndexMap(header.getColumnNameIndexMap()); + resp.setQueryId(queryId); return resp; } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 50ae6ad575..9ac3cbd404 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -400,205 +400,30 @@ public class QueryDataSetUtils { return res; } - public static List<TSQueryDataSet> convertTsBlocksToWindowSet(IQueryExecution queryExecution) - throws IoTDBException, IOException { - List<TSQueryDataSet> windowSet = new ArrayList<>(); - - int columnNum = queryExecution.getOutputValueColumnCount(); - // one time column and each value column has an actual value buffer and a bitmap value to - // indicate whether it is a null - int columnNumWithTime = columnNum * 2 + 1; + public static List<List<ByteBuffer>> convertTsBlocksToWindowSet(IQueryExecution queryExecution) + throws IoTDBException { + List<List<ByteBuffer>> windowSet = new ArrayList<>(); while (true) { - Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); - if (!optionalTsBlock.isPresent()) { + Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult(); + if (!optionalByteBuffer.isPresent()) { break; } - TsBlock tsBlock = optionalTsBlock.get(); - if (tsBlock.isEmpty()) { - continue; - } - - TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); - - DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; - ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; - for (int i = 0; i < columnNumWithTime; i++) { - byteArrayOutputStreams[i] = new ByteArrayOutputStream(); - dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); - } - - int rowCount = 0; - int[] valueOccupation = new int[columnNum]; - - // used to record a bitmap for every 8 points - int[] bitmaps = new int[columnNum]; - - int currentCount = tsBlock.getPositionCount(); - // serialize time column - for (int i = 0; i < currentCount; i++) { - // use columnOutput to write byte array - dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); - } - - // serialize each value column and its bitmap - for (int k = 0; k < columnNum; k++) { - // get DataOutputStream for current value column and its bitmap - DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; - DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; - Column column = tsBlock.getColumn(k); - TSDataType type = column.getDataType(); - switch (type) { - case INT32: - for (int i = 0; i < currentCount; i++) { - rowCount++; - if (column.isNull(i)) { - bitmaps[k] = bitmaps[k] << 1; - } else { - bitmaps[k] = (bitmaps[k] << 1) | FLAG; - dataOutputStream.writeInt(column.getInt(i)); - valueOccupation[k] += 4; - } - if (rowCount != 0 && rowCount % 8 == 0) { - dataBitmapOutputStream.writeByte(bitmaps[k]); - // we should clear the bitmap every 8 points - bitmaps[k] = 0; - } - } - break; - case INT64: - for (int i = 0; i < currentCount; i++) { - rowCount++; - if (column.isNull(i)) { - bitmaps[k] = bitmaps[k] << 1; - } else { - bitmaps[k] = (bitmaps[k] << 1) | FLAG; - dataOutputStream.writeLong(column.getLong(i)); - valueOccupation[k] += 8; - } - if (rowCount != 0 && rowCount % 8 == 0) { - dataBitmapOutputStream.writeByte(bitmaps[k]); - // we should clear the bitmap every 8 points - bitmaps[k] = 0; - } - } - break; - case FLOAT: - for (int i = 0; i < currentCount; i++) { - rowCount++; - if (column.isNull(i)) { - bitmaps[k] = bitmaps[k] << 1; - } else { - bitmaps[k] = (bitmaps[k] << 1) | FLAG; - dataOutputStream.writeFloat(column.getFloat(i)); - valueOccupation[k] += 4; - } - if (rowCount != 0 && rowCount % 8 == 0) { - dataBitmapOutputStream.writeByte(bitmaps[k]); - // we should clear the bitmap every 8 points - bitmaps[k] = 0; - } - } - break; - case DOUBLE: - for (int i = 0; i < currentCount; i++) { - rowCount++; - if (column.isNull(i)) { - bitmaps[k] = bitmaps[k] << 1; - } else { - bitmaps[k] = (bitmaps[k] << 1) | FLAG; - dataOutputStream.writeDouble(column.getDouble(i)); - valueOccupation[k] += 8; - } - if (rowCount != 0 && rowCount % 8 == 0) { - dataBitmapOutputStream.writeByte(bitmaps[k]); - // we should clear the bitmap every 8 points - bitmaps[k] = 0; - } - } - break; - case BOOLEAN: - for (int i = 0; i < currentCount; i++) { - rowCount++; - if (column.isNull(i)) { - bitmaps[k] = bitmaps[k] << 1; - } else { - bitmaps[k] = (bitmaps[k] << 1) | FLAG; - dataOutputStream.writeBoolean(column.getBoolean(i)); - valueOccupation[k] += 1; - } - if (rowCount != 0 && rowCount % 8 == 0) { - dataBitmapOutputStream.writeByte(bitmaps[k]); - // we should clear the bitmap every 8 points - bitmaps[k] = 0; - } - } - break; - case TEXT: - for (int i = 0; i < currentCount; i++) { - rowCount++; - if (column.isNull(i)) { - bitmaps[k] = bitmaps[k] << 1; - } else { - bitmaps[k] = (bitmaps[k] << 1) | FLAG; - Binary binary = column.getBinary(i); - dataOutputStream.writeInt(binary.getLength()); - dataOutputStream.write(binary.getValues()); - valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength(); - } - if (rowCount != 0 && rowCount % 8 == 0) { - dataBitmapOutputStream.writeByte(bitmaps[k]); - // we should clear the bitmap every 8 points - bitmaps[k] = 0; - } - } - break; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", type)); - } - if (k != columnNum - 1) { - rowCount -= currentCount; - } - } - - // feed the remaining bitmap - int remaining = rowCount % 8; - for (int k = 0; k < columnNum; k++) { - if (remaining != 0) { - DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; - dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining)); - } + List<ByteBuffer> res = new ArrayList<>(); + ByteBuffer byteBuffer = optionalByteBuffer.get(); + byteBuffer.mark(); + int valueColumnCount = byteBuffer.getInt(); + for (int i = 0; i < valueColumnCount; i++) { + byteBuffer.get(); } - - // calculate the time buffer size - int timeOccupation = rowCount * 8; - ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); - timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); - timeBuffer.flip(); - tsQueryDataSet.setTime(timeBuffer); - - // calculate the bitmap buffer size - int bitmapOccupation = (rowCount + 7) / 8; - - List<ByteBuffer> bitmapList = new LinkedList<>(); - List<ByteBuffer> valueList = new LinkedList<>(); - for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { - ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); - valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); - valueBuffer.flip(); - valueList.add(valueBuffer); - - ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); - bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); - bitmapBuffer.flip(); - bitmapList.add(bitmapBuffer); + int positionCount = byteBuffer.getInt(); + byteBuffer.reset(); + if (positionCount != 0) { + res.add(byteBuffer); } - tsQueryDataSet.setBitmapList(bitmapList); - tsQueryDataSet.setValueList(valueList); - windowSet.add(tsQueryDataSet); + windowSet.add(res); } return windowSet; } diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java index 3f20aa228d..1b140e0d9f 100644 --- a/session/src/main/java/org/apache/iotdb/session/ISession.java +++ b/session/src/main/java/org/apache/iotdb/session/ISession.java @@ -440,4 +440,14 @@ public interface ISession extends AutoCloseable { void sortTablet(Tablet tablet); TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException; + + List<SessionDataSet> fetchWindowSet( + List<String> queryPaths, + String functionName, + long startTime, + long endTime, + long interval, + long slidingStep, + List<Integer> indexes) + throws StatementExecutionException; } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index fd18e8a746..68e35b367b 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -3263,6 +3263,20 @@ public class Session implements ISession { return defaultSessionConnection.fetchAllConnections(); } + @Override + public List<SessionDataSet> fetchWindowSet( + List<String> queryPaths, + String functionName, + long startTime, + long endTime, + long interval, + long slidingStep, + List<Integer> indexes) + throws StatementExecutionException { + return defaultSessionConnection.fetchWindowSet( + queryPaths, functionName, startTime, endTime, interval, slidingStep, indexes); + } + public static class Builder { private String host = SessionConfig.DEFAULT_HOST; private int rpcPort = SessionConfig.DEFAULT_PORT; diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 630f6ffdeb..5f24969113 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -27,6 +27,7 @@ import org.apache.iotdb.rpc.RpcTransportFactory; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; +import org.apache.iotdb.service.rpc.thrift.TGroupByTimeParameter; import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp; @@ -38,6 +39,8 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; import org.apache.iotdb.service.rpc.thrift.TSDropSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; @@ -66,6 +69,7 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -481,6 +485,47 @@ public class SessionConnection { tsExecuteStatementResp.isIgnoreTimeStamp()); } + public List<SessionDataSet> fetchWindowSet( + List<String> queryPaths, + String functionName, + long startTime, + long endTime, + long interval, + long slidingStep, + List<Integer> indexes) + throws StatementExecutionException { + TSFetchWindowSetReq req = + new TSFetchWindowSetReq( + sessionId, + statementId, + queryPaths, + new TGroupByTimeParameter(startTime, endTime, interval, slidingStep, indexes)); + TSFetchWindowSetResp resp; + try { + resp = client.fetchWindowSet(req); + RpcUtils.verifySuccess(resp.getStatus()); + } catch (TException e) { + throw new StatementExecutionException(""); + } + + List<SessionDataSet> windowSet = new ArrayList<>(); + for (List<ByteBuffer> queryResult : resp.getQueryResultList()) { + windowSet.add( + new SessionDataSet( + "", + resp.columnNameList, + resp.columnTypeList, + resp.columnNameIndexMap, + resp.queryId, + statementId, + client, + sessionId, + queryResult, + false)); + } + return windowSet; + } + protected void insertRecord(TSInsertRecordReq request) throws IoTDBConnectionException, StatementExecutionException, RedirectException { request.setSessionId(sessionId); diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift index 4f956b0173..429853e123 100644 --- a/thrift/src/main/thrift/client.thrift +++ b/thrift/src/main/thrift/client.thrift @@ -431,9 +431,11 @@ struct TSFetchWindowSetReq { struct TSFetchWindowSetResp { 1: required common.TSStatus status - 2: required list<string> columns - 3: required list<string> dataTypeList - 4: required list<TSQueryDataSet> queryDataSetList + 2: required i64 queryId + 3: required list<string> columnNameList + 4: required list<string> columnTypeList + 5: required map<string, i32> columnNameIndexMap + 6: required list<list<binary>> queryResultList } // The sender and receiver need to check some info to confirm validity
