This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/1.2_add_group_api in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 631d56a0d3516f7286b3bdd9fa4b3ba9759b7d7b Author: Beyyes <[email protected]> AuthorDate: Mon Jul 17 19:57:47 2023 +0800 add groupby api --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 231 ++++++++++++++++++++- .../fragment/FragmentInstanceContext.java | 30 ++- .../fragment/FragmentInstanceManager.java | 2 +- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 191 +++++++++++++++++ .../iotdb/tsfile/read/filter/TimeFilter.java | 89 ++++++++ .../read/filter/factory/FilterSerializeId.java | 3 +- .../thrift/src/main/thrift/client.thrift | 18 ++ 7 files changed, 558 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 80f35f49424..7b42bb0896e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -19,11 +19,20 @@ package org.apache.iotdb.db.protocol.thrift.impl; +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.DataPartitionQueryParam; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -38,7 +47,22 @@ import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.protocol.thrift.OperationType; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; +import org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory; +import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesAggregationScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesAggregationScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; @@ -48,8 +72,14 @@ import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; @@ -68,10 +98,13 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.DropSche import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.SetSchemaTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSchemaTemplateStatement; import org.apache.iotdb.db.schemaengine.template.TemplateQueryType; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.OperationQuota; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.db.utils.SetThreadName; +import org.apache.iotdb.db.utils.TimePartitionUtils; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -100,6 +133,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; +import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; @@ -122,10 +156,18 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import io.airlift.units.Duration; +import io.jsonwebtoken.lang.Strings; +import org.apache.commons.lang.StringUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,14 +176,21 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; +import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize; +import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @@ -164,6 +213,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private final ISchemaFetcher schemaFetcher; + public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS); + @FunctionalInterface public interface SelectResult { boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) @@ -180,12 +231,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final SelectResult OLD_SELECT_RESULT = (resp, queryExecution, fetchSize) -> { - Pair<TSQueryDataSet, Boolean> pair = - QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize); + Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize); resp.setQueryDataSet(pair.left); return pair.right; }; + private static final Semaphore querySemaphore = + new Semaphore(Runtime.getRuntime().availableProcessors() * 2); + public ClientRPCServiceImpl() { partitionFetcher = ClusterPartitionFetcher.getInstance(); schemaFetcher = ClusterSchemaFetcher.getInstance(); @@ -558,6 +611,108 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } + private List<TsBlock> executeGroupByQueryInternal( + SessionInfo sessionInfo, + String device, + String measurement, + TSDataType dataType, + boolean isAligned, + long startTime, + long endTme, + long interval, + TAggregationType aggregationType, + List<DataRegion> dataRegionList) + throws IllegalPathException { + + int dataRegionSize = dataRegionList.size(); + if (dataRegionSize != 1) { + throw new IllegalArgumentException( + "dataRegionList.size() should only be 1 now, current size is " + dataRegionSize); + } + + Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme); + + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine( + instanceId, FragmentInstanceManager.getInstance().instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext( + instanceId, stateMachine, sessionInfo, dataRegionList.get(0), timeFilter); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName()); + driverContext + .getOperatorContexts() + .forEach(operatorContext -> operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE)); + + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + scanOptionsBuilder.withAllSensors(Collections.singleton(measurement)); + scanOptionsBuilder.withGlobalTimeFilter(timeFilter); + + Aggregator aggregator = + new Aggregator( + AccumulatorFactory.createAccumulator(aggregationType, dataType, null, null, true), + AggregationStep.SINGLE, + Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)})); + + GroupByTimeParameter groupByTimeParameter = + new GroupByTimeParameter(startTime, endTme, interval, interval, true); + + IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType); + AbstractSeriesAggregationScanOperator operator; + PartialPath path; + if (isAligned) { + path = + new AlignedPath( + device, + Collections.singletonList(measurement), + Collections.singletonList(measurementSchema)); + operator = + new AlignedSeriesAggregationScanOperator( + planNodeId, + (AlignedPath) path, + Ordering.ASC, + scanOptionsBuilder.build(), + driverContext.getOperatorContexts().get(0), + Collections.singletonList(aggregator), + initTimeRangeIterator(groupByTimeParameter, true, true), + groupByTimeParameter, + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); + } else { + path = new MeasurementPath(device, measurement, measurementSchema); + operator = + new SeriesAggregationScanOperator( + planNodeId, + path, + Ordering.ASC, + scanOptionsBuilder.build(), + driverContext.getOperatorContexts().get(0), + Collections.singletonList(aggregator), + initTimeRangeIterator(groupByTimeParameter, true, true), + groupByTimeParameter, + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); + } + + try { + List<TsBlock> result = new ArrayList<>(); + fragmentInstanceContext.setSourcePaths(Collections.singletonList(path)); + operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource()); + + while (operator.hasNext()) { + result.add(operator.next()); + } + + return result; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + fragmentInstanceContext.releaseResource(); + } + } + @Override public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req) { return executeStatementV2(req); @@ -588,6 +743,76 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return executeAggregationQueryInternal(req, SELECT_RESULT); } + @Override + public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryIntervalReq req) + throws TException { + + try { + querySemaphore.acquire(); + + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + + String database = req.getDatabase(); + if (StringUtils.isEmpty(database)) { + String[] splits = Strings.split(req.getDevice(), "\\."); + database = String.format("%s.%s", splits[0], splits[1]); + } + String deviceId = req.getDevice(); + String measurementId = req.getMeasurement(); + TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType()); + + // only one database, one device, one time interval + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); + TTimePartitionSlot timePartitionSlot = + TimePartitionUtils.getTimePartition(req.getStartTime()); + DataPartitionQueryParam queryParam = + new DataPartitionQueryParam( + deviceId, Collections.singletonList(timePartitionSlot), false, false); + sgNameToQueryParamsMap.put(database, Collections.singletonList(queryParam)); + DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap); + List<DataRegion> dataRegionList = new ArrayList<>(); + List<TRegionReplicaSet> replicaSets = + dataPartition.getDataRegionReplicaSet( + deviceId, Collections.singletonList(timePartitionSlot)); + for (TRegionReplicaSet region : replicaSets) { + dataRegionList.add( + StorageEngine.getInstance() + .getDataRegion(new DataRegionId(region.getRegionId().getId()))); + } + + List<TsBlock> blockResult = + executeGroupByQueryInternal( + SESSION_MANAGER.getSessionInfo(clientSession), + deviceId, + measurementId, + dataType, + true, + req.getStartTime(), + req.getEndTime(), + req.getInterval(), + req.getAggregationType(), + dataRegionList); + + String outputColumnName = req.getAggregationType().name(); + List<ColumnHeader> columnHeaders = + Collections.singletonList(new ColumnHeader(outputColumnName, dataType)); + DatasetHeader header = new DatasetHeader(columnHeaders, false); + header.setColumnToTsBlockIndexMap(Collections.singletonList(outputColumnName)); + + TSExecuteStatementResp resp = createResponse(header, 1); + TSQueryDataSet queryDataSet = convertTsBlockByFetchSize(blockResult); + resp.setQueryDataSet(queryDataSet); + + return resp; + } catch (Exception e) { + return RpcUtils.getTSExecuteStatementResp( + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_AGG_QUERY)); + } finally { + querySemaphore.release(); + SESSION_MANAGER.updateIdleTime(); + } + } + @Override public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) { long startTime = System.nanoTime(); @@ -1132,7 +1357,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<TSQueryDataSet, Boolean> pair = - QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize); + convertTsBlockByFetchSize(queryExecution, req.fetchSize); TSQueryDataSet result = pair.left; finished = pair.right; boolean hasResultSet = result.bufferForTime().limit() != 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index ae73a672f72..9da3e51c1b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -91,6 +91,19 @@ public class FragmentInstanceContext extends QueryContext { return instanceContext; } + public static FragmentInstanceContext createFragmentInstanceContext( + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + SessionInfo sessionInfo, + IDataRegionForQuery dataRegion, + Filter timeFilter) { + FragmentInstanceContext instanceContext = + new FragmentInstanceContext(id, stateMachine, sessionInfo, dataRegion, timeFilter); + instanceContext.initialize(); + instanceContext.start(); + return instanceContext; + } + public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, @@ -148,6 +161,21 @@ public class FragmentInstanceContext extends QueryContext { this.dataNodeQueryContext = null; } + private FragmentInstanceContext( + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + SessionInfo sessionInfo, + IDataRegionForQuery dataRegion, + Filter timeFilter) { + this.id = id; + this.stateMachine = stateMachine; + this.executionEndTime.set(END_TIME_INITIAL_VALUE); + this.sessionInfo = sessionInfo; + this.dataRegion = dataRegion; + this.timeFilter = timeFilter; + this.dataNodeQueryContextMap = null; + } + @TestOnly public void setDataRegion(IDataRegionForQuery dataRegion) { this.dataRegion = dataRegion; @@ -392,7 +420,7 @@ public class FragmentInstanceContext extends QueryContext { * All file paths used by this fragment instance must be cleared and thus the usage reference must * be decreased. */ - protected synchronized void releaseResource() { + public synchronized void releaseResource() { // For schema related query FI, closedFilePaths and unClosedFilePaths will be null if (closedFilePaths != null) { for (TsFileResource tsFile : closedFilePaths) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 7cb67575246..5e9bd9dd7fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -70,7 +70,7 @@ public class FragmentInstanceManager { private final IDriverScheduler scheduler = DriverScheduler.getInstance(); private final ScheduledExecutorService instanceManagementExecutor; - private final ExecutorService instanceNotificationExecutor; + public final ExecutorService instanceNotificationExecutor; private final Duration infoCacheTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 10f242195e4..977c1a841c9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -99,6 +99,197 @@ public class QueryDataSetUtils { return new Pair<>(tsQueryDataSet, finished); } + public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> tsBlocks) + throws IOException { + TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); + + // one time column and each value column has an actual value buffer and a bitmap value to + // indicate whether it is a null + int columnNum = 1; + int columnNumWithTime = columnNum * 2 + 1; + DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; + ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; + for (int i = 0; i < columnNumWithTime; i++) { + byteArrayOutputStreams[i] = new ByteArrayOutputStream(); + dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); + } + + int rowCount = 0; + int[] valueOccupation = new int[columnNum]; + + // used to record a bitmap for every 8 points + int[] bitmaps = new int[columnNum]; + for (TsBlock tsBlock : tsBlocks) { + if (tsBlock.isEmpty()) { + continue; + } + + int currentCount = tsBlock.getPositionCount(); + // serialize time column + for (int i = 0; i < currentCount; i++) { + // use columnOutput to write byte array + dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); + } + + // serialize each value column and its bitmap + for (int k = 0; k < columnNum; k++) { + // get DataOutputStream for current value column and its bitmap + DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; + DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; + + Column column = tsBlock.getColumn(k); + TSDataType type = column.getDataType(); + switch (type) { + case INT32: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeInt(column.getInt(i)); + valueOccupation[k] += 4; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case INT64: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeLong(column.getLong(i)); + valueOccupation[k] += 8; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case FLOAT: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeFloat(column.getFloat(i)); + valueOccupation[k] += 4; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case DOUBLE: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeDouble(column.getDouble(i)); + valueOccupation[k] += 8; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case BOOLEAN: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + dataOutputStream.writeBoolean(column.getBoolean(i)); + valueOccupation[k] += 1; + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + case TEXT: + for (int i = 0; i < currentCount; i++) { + rowCount++; + if (column.isNull(i)) { + bitmaps[k] = bitmaps[k] << 1; + } else { + bitmaps[k] = (bitmaps[k] << 1) | FLAG; + Binary binary = column.getBinary(i); + dataOutputStream.writeInt(binary.getLength()); + dataOutputStream.write(binary.getValues()); + valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength(); + } + if (rowCount != 0 && rowCount % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmaps[k]); + // we should clear the bitmap every 8 points + bitmaps[k] = 0; + } + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", type)); + } + if (k != columnNum - 1) { + rowCount -= currentCount; + } + } + } + // feed the remaining bitmap + int remaining = rowCount % 8; + for (int k = 0; k < columnNum; k++) { + if (remaining != 0) { + DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; + dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining)); + } + } + + // calculate the time buffer size + int timeOccupation = rowCount * 8; + ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); + timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); + timeBuffer.flip(); + tsQueryDataSet.setTime(timeBuffer); + + // calculate the bitmap buffer size + int bitmapOccupation = (rowCount + 7) / 8; + + List<ByteBuffer> bitmapList = new LinkedList<>(); + List<ByteBuffer> valueList = new LinkedList<>(); + for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { + ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); + valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); + valueBuffer.flip(); + valueList.add(valueBuffer); + + ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); + bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); + bitmapBuffer.flip(); + bitmapList.add(bitmapBuffer); + } + tsQueryDataSet.setBitmapList(bitmapList); + tsQueryDataSet.setValueList(valueList); + return tsQueryDataSet; + } + private static void serializeTsBlock( int rowCount, int currentCount, diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java index 04970f079bb..33297dfd79b 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java @@ -19,8 +19,10 @@ package org.apache.iotdb.tsfile.read.filter; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.filter.factory.FilterSerializeId; import org.apache.iotdb.tsfile.read.filter.factory.FilterType; import org.apache.iotdb.tsfile.read.filter.operator.Between; import org.apache.iotdb.tsfile.read.filter.operator.Eq; @@ -30,7 +32,11 @@ import org.apache.iotdb.tsfile.read.filter.operator.In; import org.apache.iotdb.tsfile.read.filter.operator.Lt; import org.apache.iotdb.tsfile.read.filter.operator.LtEq; import org.apache.iotdb.tsfile.read.filter.operator.NotEq; +import org.apache.iotdb.tsfile.read.filter.operator.OrFilter; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -229,4 +235,87 @@ public class TimeFilter { public static Filter defaultTimeFilter(boolean ascending) { return ascending ? TimeFilter.gtEq(Long.MIN_VALUE) : TimeFilter.ltEq(Long.MAX_VALUE); } + + public static class TimeGtEqAndLt implements Filter { + + private long startTime; + + private long endTime; + + public TimeGtEqAndLt() {} + + public TimeGtEqAndLt(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + } + + @Override + public boolean satisfy(Statistics statistics) { + return !(statistics.getEndTime() < startTime || statistics.getStartTime() >= endTime); + } + + @Override + public boolean allSatisfy(Statistics statistics) { + return startTime <= statistics.getStartTime() && statistics.getEndTime() < endTime; + } + + @Override + public boolean satisfy(long time, Object value) { + return startTime <= time && time < endTime; + } + + @Override + public boolean satisfyStartEndTime(long startTime, long endTime) { + return !(endTime < this.startTime || startTime >= this.endTime); + } + + @Override + public boolean containStartEndTime(long startTime, long endTime) { + return this.startTime <= startTime && endTime < this.endTime; + } + + @Override + public Filter copy() { + return new TimeGtEqAndLt(startTime, endTime); + } + + @Override + public String toString() { + return "TimeGtEqAndLt{" + "startTime=" + startTime + ", endTime=" + endTime + '}'; + } + + @Override + public void serialize(DataOutputStream outputStream) { + try { + outputStream.write(getSerializeId().ordinal()); + outputStream.writeLong(startTime); + outputStream.writeLong(endTime); + } catch (IOException ignored) { + // ignored + } + } + + @Override + public void deserialize(ByteBuffer buffer) { + startTime = buffer.getLong(); + endTime = buffer.getLong(); + } + + @Override + public FilterSerializeId getSerializeId() { + return FilterSerializeId.TIME_GTEQ_AND_LT; + } + + @Override + public List<TimeRange> getTimeRanges() { + return startTime >= endTime + ? Collections.emptyList() + : Collections.singletonList(new TimeRange(startTime, endTime - 1)); + } + + @Override + public Filter reverse() { + return new OrFilter(new TimeLt(startTime), new TimeGtEq(endTime)); + } + } } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java index bb3c7e10608..912bc9f3360 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java @@ -34,5 +34,6 @@ public enum FilterSerializeId { IN, REGEXP, LIKE, - BETWEEN + BETWEEN, + TIME_GTEQ_AND_LT } diff --git a/iotdb-protocol/thrift/src/main/thrift/client.thrift b/iotdb-protocol/thrift/src/main/thrift/client.thrift index e111c8d2a35..45d3aa608df 100644 --- a/iotdb-protocol/thrift/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift/src/main/thrift/client.thrift @@ -352,6 +352,21 @@ struct TSAggregationQueryReq { 11: optional bool legalPathNodes } +struct TSGroupByQueryIntervalReq { + 1: required i64 sessionId + 2: required i64 statementId + 3: required string device + 4: required string measurement + 5: required i32 dataType + 6: required common.TAggregationType aggregationType + 7: optional string database + 8: optional i64 startTime + 9: optional i64 endTime + 10: optional i64 interval + 11: optional i32 fetchSize + 12: optional i64 timeout +} + struct TSCreateMultiTimeseriesReq { 1: required i64 sessionId 2: required list<string> paths @@ -500,6 +515,9 @@ service IClientRPCService { TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq req); + TSExecuteStatementResp executeGroupByQueryIntervalQuery(1:TSGroupByQueryIntervalReq req); + + TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req); TSOpenSessionResp openSession(1:TSOpenSessionReq req);
