This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7442531210b70324b16ba7ab6784e28cd23e26a3 Author: Minghui Liu <[email protected]> AuthorDate: Tue Nov 1 16:36:15 2022 +0800 implement fetchWindowSet in ClientRPCServiceImpl --- .../service/thrift/impl/ClientRPCServiceImpl.java | 66 ++++++- .../db/service/thrift/impl/TSServiceImpl.java | 6 +- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 203 +++++++++++++++++++++ .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 13 ++ thrift/src/main/thrift/client.thrift | 20 +- 5 files changed, 292 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index b3d6f41230..24eb5bf432 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -70,8 +70,6 @@ import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.ServerProperties; -import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetReq; -import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetResp; import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; @@ -90,6 +88,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq; import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; @@ -421,8 +421,59 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } @Override - public TFetchWindowSetResp fetchWindowSet(TFetchWindowSetReq req) throws TException { - return null; + public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException { + if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) { + return RpcUtils.getTSFetchWindowSetResp(getNotLoggedInStatus()); + } + long startTime = System.currentTimeMillis(); + try { + Statement s = + StatementGenerator.createStatement(req, SESSION_MANAGER.getCurrSession().getZoneId()); + + // permission check + TSStatus status = AuthorityChecker.checkAuthority(s, SESSION_MANAGER.getCurrSession()); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return RpcUtils.getTSFetchWindowSetResp(status); + } + + QUERY_FREQUENCY_RECORDER.incrementAndGet(); + AUDIT_LOGGER.debug("Session {} execute fetch window set: {}", req.sessionId, req); + long queryId = + SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId); + // create and cache dataset + ExecutionResult result = + COORDINATOR.execute( + s, + queryId, + SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), + "", + PARTITION_FETCHER, + SCHEMA_FETCHER, + config.getQueryTimeoutThreshold()); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new RuntimeException("error code: " + result.status); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + TSFetchWindowSetResp resp = createResponse(queryExecution.getDatasetHeader()); + resp.setStatus(result.status); + resp.setQueryDataSetList(QueryDataSetUtils.convertTsBlocksToWindowSet(queryExecution)); + return resp; + } + } catch (Exception e) { + // TODO call the coordinator to release query resource + return RpcUtils.getTSFetchWindowSetResp( + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY)); + } finally { + addOperationLatency(Operation.EXECUTE_QUERY, startTime); + long costTime = System.currentTimeMillis() - startTime; + if (costTime >= CONFIG.getSlowQueryThreshold()) { + SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, req); + } + } } @Override @@ -1747,6 +1798,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } + private TSFetchWindowSetResp createResponse(DatasetHeader header) { + TSFetchWindowSetResp resp = RpcUtils.getTSFetchWindowSetResp(TSStatusCode.SUCCESS_STATUS); + resp.setColumns(header.getRespColumns()); + resp.setDataTypeList(header.getRespDataTypeList()); + return resp; + } + private TSStatus getNotLoggedInStatus() { return RpcUtils.getStatus( TSStatusCode.NOT_LOGIN_ERROR, diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java index 06ef9eec23..9dd837b131 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java @@ -85,8 +85,6 @@ import org.apache.iotdb.rpc.RedirectException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.ServerProperties; -import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetReq; -import org.apache.iotdb.service.rpc.thrift.TFetchWindowSetResp; import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; @@ -105,6 +103,8 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq; import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; @@ -279,7 +279,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler { } @Override - public TFetchWindowSetResp fetchWindowSet(TFetchWindowSetReq req) throws TException { + public TSFetchWindowSetResp fetchWindowSet(TSFetchWindowSetReq req) throws TException { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index ca603bf535..50ae6ad575 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -400,6 +400,209 @@ public class QueryDataSetUtils { return res; } + public static List<TSQueryDataSet> convertTsBlocksToWindowSet(IQueryExecution queryExecution) + throws IoTDBException, IOException { + List<TSQueryDataSet> windowSet = new ArrayList<>(); + + int columnNum = queryExecution.getOutputValueColumnCount(); + // one time column and each value column has an actual value buffer and a bitmap value to + // indicate whether it is a null + int columnNumWithTime = columnNum * 2 + 1; + + while (true) { + Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); + if (!optionalTsBlock.isPresent()) { + break; + } + TsBlock tsBlock = optionalTsBlock.get(); + if (tsBlock.isEmpty()) { + continue; + } + + TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); + + DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; + ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; + for (int i = 0; i < columnNumWithTime; i++) { + byteArrayOutputStreams[i] = new ByteArrayOutputStream(); + dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); + } + + int rowCount = 0; + int[] valueOccupation = new int[columnNum]; + + // used to record a bitmap for every 8 points + int[] bitmaps = new int[columnNum]; + + int currentCount = tsBlock.getPositionCount(); + // serialize time column + for (int i = 0; i < currentCount; i++) { + // use columnOutput to write byte array + dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); + } + + // serialize each value column and its bitmap + for (int k = 0; k < columnNum; k++) { + // get DataOutputStream for current value column and its bitmap + DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; + DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; + + Column column = tsBlock.getColumn(k); + TSDataType type = column.getDataType(); + switch (type) { + case INT32: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeInt(column.getInt(i)); + valueOccupation[k] += 4; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case INT64: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeLong(column.getLong(i)); + valueOccupation[k] += 8; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case FLOAT: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeFloat(column.getFloat(i)); + valueOccupation[k] += 4; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case DOUBLE: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeDouble(column.getDouble(i)); + valueOccupation[k] += 8; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case BOOLEAN: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeBoolean(column.getBoolean(i)); + valueOccupation[k] += 1; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case TEXT: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + Binary binary = column.getBinary(i); + dataOutputStream.writeInt(binary.getLength()); + dataOutputStream.write(binary.getValues()); + valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength(); + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", type)); + } + if (k != columnNum - 1) { + rowCount -= currentCount; + } + } + + // feed the remaining bitmap + int remaining = rowCount % 8; + for (int k = 0; k < columnNum; k++) { + if (remaining != 0) { + DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; + dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining)); + } + } + + // calculate the time buffer size + int timeOccupation = rowCount * 8; + ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); + timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); + timeBuffer.flip(); + tsQueryDataSet.setTime(timeBuffer); + + // calculate the bitmap buffer size + int bitmapOccupation = (rowCount + 7) / 8; + + List<ByteBuffer> bitmapList = new LinkedList<>(); + List<ByteBuffer> valueList = new LinkedList<>(); + for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { + ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); + valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); + valueBuffer.flip(); + valueList.add(valueBuffer); + + ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); + bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); + bitmapBuffer.flip(); + bitmapList.add(bitmapBuffer); + } + tsQueryDataSet.setBitmapList(bitmapList); + tsQueryDataSet.setValueList(valueList); + + windowSet.add(tsQueryDataSet); + } + return windowSet; + } + public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) { long[] times = new long[size]; for (int i = 0; i < size; i++) { diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java index 89c9a21c4a..cbedcf75e2 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java @@ -25,6 +25,7 @@ import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxTSStatus; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchWindowSetResp; import java.lang.reflect.Proxy; import java.text.SimpleDateFormat; @@ -225,6 +226,18 @@ public class RpcUtils { return resp; } + public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatus status) { + TSFetchWindowSetResp resp = new TSFetchWindowSetResp(); + TSStatus tsStatus = new TSStatus(status); + resp.setStatus(tsStatus); + return resp; + } + + public static TSFetchWindowSetResp getTSFetchWindowSetResp(TSStatusCode tsStatusCode) { + TSStatus status = getStatus(tsStatusCode); + return getTSFetchWindowSetResp(status); + } + public static final String DEFAULT_TIME_FORMAT = "default"; public static final String DEFAULT_TIMESTAMP_PRECISION = "ms"; diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift index 51b0d24c29..4f956b0173 100644 --- a/thrift/src/main/thrift/client.thrift +++ b/thrift/src/main/thrift/client.thrift @@ -421,17 +421,19 @@ struct TGroupByTimeParameter { 5: required list<i32> indexes } -struct TFetchWindowSetReq { +struct TSFetchWindowSetReq { 1: required i64 sessionId - 2: required list<string> queryPaths - 3: optional string functionName - 4: required TGroupByTimeParameter groupByTimeParameter + 2: required i64 statementId + 3: required list<string> queryPaths + 4: optional string functionName + 5: required TGroupByTimeParameter groupByTimeParameter } -struct TFetchWindowSetResp { - 1: required list<string> columns - 2: required list<string> dataTypeList - 3: required list<TSQueryDataSet> queryDataSetList +struct TSFetchWindowSetResp { + 1: required common.TSStatus status + 2: required list<string> columns + 3: required list<string> dataTypeList + 4: required list<TSQueryDataSet> queryDataSetList } // The sender and receiver need to check some info to confirm validity @@ -582,5 +584,5 @@ service IClientRPCService { TSConnectionInfoResp fetchAllConnectionsInfo(); - TFetchWindowSetResp fetchWindowSet(1:TFetchWindowSetReq req) + TSFetchWindowSetResp fetchWindowSet(1:TSFetchWindowSetReq req); } \ No newline at end of file
