This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch benchants_branch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9cced1296b03dcd76d4bd4a43a181869d18699fd Author: Beyyes <[email protected]> AuthorDate: Wed Jun 7 18:17:01 2023 +0800 add executeGroupByQueryIntervalQuery rpc interface --- .../thrift/src/main/thrift/client.thrift | 11 +- .../service/thrift/impl/ClientRPCServiceImpl.java | 82 +++++++-- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 191 +++++++++++++++++++++ .../iotdb/tsfile/read/filter/TimeFilter.java | 2 +- 4 files changed, 267 insertions(+), 19 deletions(-) diff --git a/iotdb-protocol/thrift/src/main/thrift/client.thrift b/iotdb-protocol/thrift/src/main/thrift/client.thrift index ec35e8f5020..f5837ec84d3 100644 --- a/iotdb-protocol/thrift/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift/src/main/thrift/client.thrift @@ -359,11 +359,12 @@ struct TSGroupByQueryIntervalReq { 4: required string measurement 5: required i32 dataType 6: required common.TAggregationType aggregationType - 7: optional i64 startTime - 8: optional i64 endTime - 9: optional i64 interval - 10: optional i32 fetchSize - 11: optional i64 timeout + 7: optional string database + 8: optional i64 startTime + 9: optional i64 endTime + 10: optional i64 interval + 11: optional i32 fetchSize + 12: optional i64 timeout } struct TSCreateMultiTimeseriesReq { diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 5c3cba12996..22648b62d4d 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -18,13 +18,19 @@ */ package org.apache.iotdb.db.service.thrift.impl; +import io.jsonwebtoken.lang.Strings; +import org.apache.commons.lang.StringUtils; import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.DataPartitionQueryParam; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; @@ -44,6 +50,7 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.common.SessionInfo; +import org.apache.iotdb.db.mpp.common.header.ColumnHeader; import org.apache.iotdb.db.mpp.common.header.DatasetHeader; import org.apache.iotdb.db.mpp.execution.driver.DriverContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; @@ -86,6 +93,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTempla import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement; import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.protocol.rest.StringUtil; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager; @@ -94,6 +102,7 @@ import org.apache.iotdb.db.service.basic.BasicOpenSessionResp; import org.apache.iotdb.db.sync.SyncService; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.db.utils.SetThreadName; +import org.apache.iotdb.db.utils.TimePartitionUtils; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -164,9 +173,11 @@ import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; @@ -174,6 +185,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTim import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; +import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @@ -193,6 +205,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS); + private static final Semaphore querySemaphore = + new Semaphore(Runtime.getRuntime().availableProcessors() * 2); + private final IPartitionFetcher partitionFetcher; private final ISchemaFetcher schemaFetcher; @@ -213,8 +228,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final SelectResult OLD_SELECT_RESULT = (resp, queryExecution, fetchSize) -> { - Pair<TSQueryDataSet, Boolean> pair = - QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize); + Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize); resp.setQueryDataSet(pair.left); return pair.right; }; @@ -1262,7 +1276,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<TSQueryDataSet, Boolean> pair = - QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize); + convertTsBlockByFetchSize(queryExecution, req.fetchSize); TSQueryDataSet result = pair.left; finished = pair.right; boolean hasResultSet = result.bufferForTime().limit() != 0; @@ -1848,28 +1862,70 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryIntervalReq req) throws TException { + try { - IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + querySemaphore.acquire(); - DataRegionId dataRegionId = new DataRegionId(5); - List<DataRegion> dataRegionList = null; - StorageEngine.getInstance().getDataRegion(dataRegionId); + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); - List<TsBlock> ret = + String database = req.getDatabase(); + if (StringUtils.isEmpty(database)) { + String[] splits = Strings.split(req.getDevice(), "\\."); + database = String.format("%s.%s", splits[0], splits[1]); + } + String deviceId = req.getDevice(); + String measurementId = req.getMeasurement(); + TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType()); + + // only one database, one device, one time interval + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); + TTimePartitionSlot timePartitionSlot = + TimePartitionUtils.getTimePartition(req.getStartTime()); + DataPartitionQueryParam queryParam = + new DataPartitionQueryParam( + deviceId, Collections.singletonList(timePartitionSlot), false, false); + sgNameToQueryParamsMap.put(database, Collections.singletonList(queryParam)); + DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap); + List<DataRegion> dataRegionList = new ArrayList<>(); + List<TRegionReplicaSet> replicaSets = + dataPartition.getDataRegionReplicaSet( + deviceId, Collections.singletonList(timePartitionSlot)); + for (TRegionReplicaSet region : replicaSets) { + dataRegionList.add( + StorageEngine.getInstance() + .getDataRegion(new DataRegionId(region.getRegionId().getId()))); + } + + List<TsBlock> blockResult = executeGroupByQueryInternal( SESSION_MANAGER.getSessionInfo(clientSession), - req.getDevice(), - req.getMeasurement(), - TSDataType.getTsDataType((byte) 2), + deviceId, + measurementId, + dataType, req.getStartTime(), req.getEndTime(), req.getInterval(), req.getAggregationType(), dataRegionList); - } catch (Exception e) { + String outputColumnName = req.getAggregationType().name(); + List<ColumnHeader> columnHeaders = + Collections.singletonList(new ColumnHeader(outputColumnName, dataType)); + DatasetHeader header = new DatasetHeader(columnHeaders, false); + header.setColumnToTsBlockIndexMap(Collections.singletonList(outputColumnName)); + + TSExecuteStatementResp resp = createResponse(header, 1); + TSQueryDataSet queryDataSet = convertTsBlockByFetchSize(blockResult); + resp.setQueryDataSet(queryDataSet); + + return resp; + } catch (Exception e) { + return RpcUtils.getTSExecuteStatementResp( + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_AGG_QUERY)); + } finally { + querySemaphore.release(); + SESSION_MANAGER.updateIdleTime(); } - return null; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 6baee727383..9aa5c0b8481 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -244,6 +244,197 @@ public class QueryDataSetUtils { return new Pair<>(tsQueryDataSet, finished); } + public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> tsBlocks) + throws IOException { + TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); + + // one time column and each value column has an actual value buffer and a bitmap value to + // indicate whether it is a null + int columnNum = 1; + int columnNumWithTime = columnNum * 2 + 1; + DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; + ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; + for (int i = 0; i < columnNumWithTime; i++) { + byteArrayOutputStreams[i] = new ByteArrayOutputStream(); + dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); + } + + int rowCount = 0; + int[] valueOccupation = new int[columnNum]; + + // used to record a bitmap for every 8 points + int[] bitmaps = new int[columnNum]; + for (TsBlock tsBlock : tsBlocks) { + if (tsBlock.isEmpty()) { + continue; + } + + int currentCount = tsBlock.getPositionCount(); + // serialize time column + for (int i = 0; i < currentCount; i++) { + // use columnOutput to write byte array + dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); + } + + // serialize each value column and its bitmap + for (int k = 0; k < columnNum; k++) { + // get DataOutputStream for current value column and its bitmap + DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; + DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; + + Column column = tsBlock.getColumn(k); + TSDataType type = column.getDataType(); + switch (type) { + case INT32: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeInt(column.getInt(i)); + valueOccupation[k] += 4; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case INT64: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeLong(column.getLong(i)); + valueOccupation[k] += 8; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case FLOAT: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeFloat(column.getFloat(i)); + valueOccupation[k] += 4; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case DOUBLE: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeDouble(column.getDouble(i)); + valueOccupation[k] += 8; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case BOOLEAN: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeBoolean(column.getBoolean(i)); + valueOccupation[k] += 1; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case TEXT: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + Binary binary = column.getBinary(i); + dataOutputStream.writeInt(binary.getLength()); + dataOutputStream.write(binary.getValues()); + valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength(); + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", type)); + } + if (k != columnNum - 1) { + rowCount -= currentCount; + } + } + } + // feed the remaining bitmap + int remaining = rowCount % 8; + for (int k = 0; k < columnNum; k++) { + if (remaining != 0) { + DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; + dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining)); + } + } + + // calculate the time buffer size + int timeOccupation = rowCount * 8; + ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); + timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); + timeBuffer.flip(); + tsQueryDataSet.setTime(timeBuffer); + + // calculate the bitmap buffer size + int bitmapOccupation = (rowCount + 7) / 8; + + List<ByteBuffer> bitmapList = new LinkedList<>(); + List<ByteBuffer> valueList = new LinkedList<>(); + for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { + ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); + valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); + valueBuffer.flip(); + valueList.add(valueBuffer); + + ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); + bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); + bitmapBuffer.flip(); + bitmapList.add(bitmapBuffer); + } + tsQueryDataSet.setBitmapList(bitmapList); + tsQueryDataSet.setValueList(valueList); + return tsQueryDataSet; + } + /** pair.left is serialized TsBlock pair.right indicates if the query finished */ // To fetch required amounts of data and combine them through List public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize( diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java index cda1762e917..b42f5e7ca3b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java @@ -253,7 +253,7 @@ public class TimeFilter { @Override public boolean satisfyStartEndTime(long startTime, long endTime) { - return !(startTime < this.startTime || endTime >= this.endTime); + return !(endTime < this.startTime || startTime >= this.endTime); } @Override
