This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch removeQueryMemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bfcc9772b5d47f0ec4f460252ebebd0de15eb4f2 Author: Alima777 <[email protected]> AuthorDate: Mon Jun 21 17:39:34 2021 +0800 remove query memory control --- .../main/java/org/apache/iotdb/db/qp/Planner.java | 38 +- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 204 +++++----- .../qp/strategy/optimizer/ConcatPathOptimizer.java | 75 ++-- .../qp/strategy/optimizer/ILogicalOptimizer.java | 7 +- .../db/query/control/QueryResourceManager.java | 102 ++--- .../org/apache/iotdb/db/service/TSServiceImpl.java | 444 +++++++++++---------- .../db/integration/IoTDBSequenceDataQueryIT.java | 139 ++++--- .../iotdb/db/integration/IoTDBSeriesReaderIT.java | 235 ++++++----- .../iotdb/db/qp/plan/LogicalPlanSmallTest.java | 129 +++--- .../apache/iotdb/db/utils/EnvironmentUtils.java | 73 ++-- 10 files changed, 747 insertions(+), 699 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java index cb2ab65..9df8031 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java @@ -26,7 +26,12 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.logical.Operator; -import org.apache.iotdb.db.qp.logical.crud.*; +import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator; +import org.apache.iotdb.db.qp.logical.crud.FilterOperator; +import org.apache.iotdb.db.qp.logical.crud.FromOperator; +import org.apache.iotdb.db.qp.logical.crud.QueryOperator; +import org.apache.iotdb.db.qp.logical.crud.SFWOperator; +import org.apache.iotdb.db.qp.logical.crud.SelectOperator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.strategy.LogicalGenerator; import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; @@ -34,7 +39,6 @@ import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer; import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer; import org.apache.iotdb.db.qp.strategy.optimizer.MergeSingleFilterOptimizer; import org.apache.iotdb.db.qp.strategy.optimizer.RemoveNotOptimizer; -import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; @@ -63,16 +67,7 @@ public class Planner { public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr, ZoneId zoneId, int fetchSize) throws QueryProcessException { Operator operator = logicalGenerator.generate(sqlStr, zoneId); - int maxDeduplicatedPathNum = - QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize); - if (operator instanceof SFWOperator && ((SFWOperator) operator).isLastQuery()) { - // Dataset of last query actually has only three columns, so we shouldn't limit the path num - // while constructing logical plan - // To avoid overflowing because logicalOptimize function may do maxDeduplicatedPathNum + 1, we - // set it to Integer.MAX_VALUE - 1 - maxDeduplicatedPathNum = Integer.MAX_VALUE - 1; - } - operator = logicalOptimize(operator, maxDeduplicatedPathNum); + operator = logicalOptimize(operator); PhysicalGenerator physicalGenerator = new PhysicalGenerator(); PhysicalPlan physicalPlan = physicalGenerator.transformToPhysicalPlan(operator, fetchSize); physicalPlan.setDebug(operator.isDebug()); @@ -120,16 +115,7 @@ public class Planner { queryOp.setFilterOperator(filterOp); - int maxDeduplicatedPathNum = - QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(rawDataQueryReq.fetchSize); - if (queryOp.isLastQuery()) { - // Dataset of last query actually has only three columns, so we shouldn't limit the path num - // while constructing logical plan - // To avoid overflowing because logicalOptimize function may do maxDeduplicatedPathNum + 1, we - // set it to Integer.MAX_VALUE - 1 - maxDeduplicatedPathNum = Integer.MAX_VALUE - 1; - } - SFWOperator op = (SFWOperator) logicalOptimize(queryOp, maxDeduplicatedPathNum); + SFWOperator op = (SFWOperator) logicalOptimize(queryOp); PhysicalGenerator physicalGenerator = new PhysicalGenerator(); PhysicalPlan physicalPlan = @@ -145,7 +131,7 @@ public class Planner { * @return optimized logical operator * @throws LogicalOptimizeException exception in logical optimizing */ - protected Operator logicalOptimize(Operator operator, int maxDeduplicatedPathNum) + protected Operator logicalOptimize(Operator operator) throws LogicalOperatorException, PathNumOverLimitException { switch (operator.getType()) { case AUTHOR: @@ -178,7 +164,7 @@ public class Planner { case UPDATE: case DELETE: SFWOperator root = (SFWOperator) operator; - return optimizeSFWOperator(root, maxDeduplicatedPathNum); + return optimizeSFWOperator(root); default: throw new LogicalOperatorException(operator.getType().toString(), ""); } @@ -191,10 +177,10 @@ public class Planner { * @return optimized select-from-where operator * @throws LogicalOptimizeException exception in SFW optimizing */ - private SFWOperator optimizeSFWOperator(SFWOperator root, int maxDeduplicatedPathNum) + private SFWOperator optimizeSFWOperator(SFWOperator root) throws LogicalOperatorException, PathNumOverLimitException { ConcatPathOptimizer concatPathOptimizer = getConcatPathOptimizer(); - root = (SFWOperator) concatPathOptimizer.transform(root, maxDeduplicatedPathNum); + root = (SFWOperator) concatPathOptimizer.transform(root); FilterOperator filter = root.getFilterOperator(); if (filter == null) { return root; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index d7863c8..7f4eb27 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -18,19 +18,11 @@ */ package org.apache.iotdb.db.qp.strategy; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.exception.query.PathNumOverLimitException; import org.apache.iotdb.db.exception.query.LogicalOperatorException; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; +import org.apache.iotdb.db.exception.query.PathNumOverLimitException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.runtime.SQLParserException; import org.apache.iotdb.db.metadata.PartialPath; @@ -103,7 +95,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType; import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan; import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; -import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.qp.physical.sys.TracingPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.SchemaUtils; @@ -111,13 +102,18 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.utils.Pair; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; -/** - * Used to convert logical operator to physical plan - */ +/** Used to convert logical operator to physical plan */ public class PhysicalGenerator { - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public PhysicalPlan transformToPhysicalPlan(Operator operator, int fetchSize) throws QueryProcessException { @@ -126,8 +122,13 @@ public class PhysicalGenerator { case AUTHOR: AuthorOperator author = (AuthorOperator) operator; try { - return new AuthorPlan(author.getAuthorType(), author.getUserName(), author.getRoleName(), - author.getPassWord(), author.getNewPassword(), author.getPrivilegeList(), + return new AuthorPlan( + author.getAuthorType(), + author.getUserName(), + author.getRoleName(), + author.getPassWord(), + author.getNewPassword(), + author.getPrivilegeList(), author.getNodeName()); } catch (AuthException e) { throw new QueryProcessException(e.getMessage()); @@ -148,9 +149,10 @@ public class PhysicalGenerator { return new DeleteStorageGroupPlan(deleteStorageGroup.getDeletePathList()); case CREATE_TIMESERIES: CreateTimeSeriesOperator createOperator = (CreateTimeSeriesOperator) operator; - if (createOperator.getTags() != null && !createOperator.getTags().isEmpty() - && createOperator.getAttributes() != null && !createOperator.getAttributes() - .isEmpty()) { + if (createOperator.getTags() != null + && !createOperator.getTags().isEmpty() + && createOperator.getAttributes() != null + && !createOperator.getAttributes().isEmpty()) { for (String tagKey : createOperator.getTags().keySet()) { if (createOperator.getAttributes().containsKey(tagKey)) { throw new QueryProcessException( @@ -159,17 +161,26 @@ public class PhysicalGenerator { } } } - return new CreateTimeSeriesPlan(createOperator.getPath(), createOperator.getDataType(), - createOperator.getEncoding(), createOperator.getCompressor(), createOperator.getProps(), - createOperator.getTags(), createOperator.getAttributes(), createOperator.getAlias()); + return new CreateTimeSeriesPlan( + createOperator.getPath(), + createOperator.getDataType(), + createOperator.getEncoding(), + createOperator.getCompressor(), + createOperator.getProps(), + createOperator.getTags(), + createOperator.getAttributes(), + createOperator.getAlias()); case DELETE_TIMESERIES: DeleteTimeSeriesOperator deletePath = (DeleteTimeSeriesOperator) operator; return new DeleteTimeSeriesPlan(deletePath.getDeletePathList()); case ALTER_TIMESERIES: AlterTimeSeriesOperator alterTimeSeriesOperator = (AlterTimeSeriesOperator) operator; - return new AlterTimeSeriesPlan(alterTimeSeriesOperator.getPath(), - alterTimeSeriesOperator.getAlterType(), alterTimeSeriesOperator.getAlterMap(), - alterTimeSeriesOperator.getAlias(), alterTimeSeriesOperator.getTagsMap(), + return new AlterTimeSeriesPlan( + alterTimeSeriesOperator.getPath(), + alterTimeSeriesOperator.getAlterType(), + alterTimeSeriesOperator.getAlterMap(), + alterTimeSeriesOperator.getAlias(), + alterTimeSeriesOperator.getTagsMap(), alterTimeSeriesOperator.getAttributesMap()); case DELETE: DeleteDataOperator delete = (DeleteDataOperator) operator; @@ -185,8 +196,8 @@ public class PhysicalGenerator { insert.getMeasurementList().length, insert.getValueList().length)); } - return new InsertRowPlan(paths.get(0), insert.getTime(), - insert.getMeasurementList(), insert.getValueList()); + return new InsertRowPlan( + paths.get(0), insert.getTime(), insert.getMeasurementList(), insert.getValueList()); case MERGE: if (operator.getTokenIntType() == SQLConstant.TOK_FULL_MERGE) { return new MergePlan(OperatorType.FULL_MERGE); @@ -201,7 +212,7 @@ public class PhysicalGenerator { return new TracingPlan(tracingOperator.isTracingon()); case QUERY: QueryOperator query = (QueryOperator) operator; - return transformQuery(query, fetchSize); + return transformQuery(query); case TTL: switch (operator.getTokenIntType()) { case SQLConstant.TOK_SET: @@ -214,12 +225,13 @@ public class PhysicalGenerator { ShowTTLOperator showTTLOperator = (ShowTTLOperator) operator; return new ShowTTLPlan(showTTLOperator.getStorageGroups()); default: - throw new LogicalOperatorException(String - .format("not supported operator type %s in ttl operation.", operator.getType())); + throw new LogicalOperatorException( + String.format( + "not supported operator type %s in ttl operation.", operator.getType())); } case LOAD_CONFIGURATION: - LoadConfigurationOperatorType type = ((LoadConfigurationOperator) operator) - .getLoadConfigurationOperatorType(); + LoadConfigurationOperatorType type = + ((LoadConfigurationOperator) operator).getLoadConfigurationOperatorType(); return generateLoadConfigurationPlan(type); case SHOW: switch (operator.getTokenIntType()) { @@ -229,10 +241,14 @@ public class PhysicalGenerator { return new ShowPlan(ShowContentType.VERSION); case SQLConstant.TOK_TIMESERIES: ShowTimeSeriesOperator showTimeSeriesOperator = (ShowTimeSeriesOperator) operator; - return new ShowTimeSeriesPlan(showTimeSeriesOperator.getPath(), - showTimeSeriesOperator.isContains(), showTimeSeriesOperator.getKey(), - showTimeSeriesOperator.getValue(), showTimeSeriesOperator.getLimit(), - showTimeSeriesOperator.getOffset(), showTimeSeriesOperator.isOrderByHeat()); + return new ShowTimeSeriesPlan( + showTimeSeriesOperator.getPath(), + showTimeSeriesOperator.isContains(), + showTimeSeriesOperator.getKey(), + showTimeSeriesOperator.getValue(), + showTimeSeriesOperator.getLimit(), + showTimeSeriesOperator.getOffset(), + showTimeSeriesOperator.isOrderByHeat()); case SQLConstant.TOK_STORAGE_GROUP: return new ShowStorageGroupPlan( ShowContentType.STORAGE_GROUP, ((ShowStorageGroupOperator) operator).getPath()); @@ -246,10 +262,14 @@ public class PhysicalGenerator { return new CountPlan( ShowContentType.COUNT_STORAGE_GROUP, ((CountOperator) operator).getPath()); case SQLConstant.TOK_COUNT_NODE_TIMESERIES: - return new CountPlan(ShowContentType.COUNT_NODE_TIMESERIES, - ((CountOperator) operator).getPath(), ((CountOperator) operator).getLevel()); + return new CountPlan( + ShowContentType.COUNT_NODE_TIMESERIES, + ((CountOperator) operator).getPath(), + ((CountOperator) operator).getLevel()); case SQLConstant.TOK_COUNT_NODES: - return new CountPlan(ShowContentType.COUNT_NODES, ((CountOperator) operator).getPath(), + return new CountPlan( + ShowContentType.COUNT_NODES, + ((CountOperator) operator).getPath(), ((CountOperator) operator).getLevel()); case SQLConstant.TOK_COUNT_TIMESERIES: return new CountPlan( @@ -263,15 +283,19 @@ public class PhysicalGenerator { "not supported operator type %s in show operation.", operator.getType())); } case LOAD_FILES: - return new OperateFilePlan(((LoadFilesOperator) operator).getFile(), - OperatorType.LOAD_FILES, ((LoadFilesOperator) operator).isAutoCreateSchema(), + return new OperateFilePlan( + ((LoadFilesOperator) operator).getFile(), + OperatorType.LOAD_FILES, + ((LoadFilesOperator) operator).isAutoCreateSchema(), ((LoadFilesOperator) operator).getSgLevel()); case REMOVE_FILE: - return new OperateFilePlan(((RemoveFileOperator) operator).getFile(), - OperatorType.REMOVE_FILE); + return new OperateFilePlan( + ((RemoveFileOperator) operator).getFile(), OperatorType.REMOVE_FILE); case MOVE_FILE: - return new OperateFilePlan(((MoveFileOperator) operator).getFile(), - ((MoveFileOperator) operator).getTargetDir(), OperatorType.MOVE_FILE); + return new OperateFilePlan( + ((MoveFileOperator) operator).getFile(), + ((MoveFileOperator) operator).getTargetDir(), + OperatorType.MOVE_FILE); case CLEAR_CACHE: return new ClearCachePlan(); case SHOW_MERGE_STATUS: @@ -297,17 +321,16 @@ public class PhysicalGenerator { throw new QueryProcessException( String.format("Unrecognized load configuration operator type, %s", type.name())); } - } /** * get types for path list * * @return pair.left is the type of column in result set, pair.right is the real type of the - * measurement + * measurement */ - protected Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(List<PartialPath> paths, - String aggregation) throws MetadataException { + protected Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes( + List<PartialPath> paths, String aggregation) throws MetadataException { List<TSDataType> measurementDataTypes = SchemaUtils.getSeriesTypesByPaths(paths, (String) null); // if the aggregation function is null, the type of column in result set // is equal to the real type of the measurement @@ -325,10 +348,8 @@ public class PhysicalGenerator { return SchemaUtils.getSeriesTypesByPath(paths); } - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize) - throws QueryProcessException { + private PhysicalPlan transformQuery(QueryOperator queryOperator) throws QueryProcessException { QueryPlan queryPlan; if (queryOperator.hasAggregation()) { @@ -441,26 +462,27 @@ public class PhysicalGenerator { if (actualPaths.size() == 1) { String columnName = actualPaths.get(0).getMeasurement(); if (originAggregations != null && !originAggregations.isEmpty()) { - measurementAliasMap.put(originAggregations.get(i) + "(" + columnName + ")", - suffixPath.getTsAlias()); + measurementAliasMap.put( + originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias()); } else { measurementAliasMap.put(columnName, suffixPath.getTsAlias()); } } else if (actualPaths.size() >= 2) { throw new QueryProcessException( - "alias '" + suffixPath.getTsAlias() + "alias '" + + suffixPath.getTsAlias() + "' can only be matched with one time series"); } } // for actual non exist path - if (originAggregations != null && actualPaths.isEmpty() && originAggregations - .isEmpty()) { + if (originAggregations != null + && actualPaths.isEmpty() + && originAggregations.isEmpty()) { String nonExistMeasurement = fullPath.getMeasurement(); if (measurementSetOfGivenSuffix.add(nonExistMeasurement) && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) { - measurementTypeMap - .put(fullPath.getMeasurement(), MeasurementType.NonExist); + measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist); } } @@ -471,17 +493,19 @@ public class PhysicalGenerator { // the actual query in the AlignByDeviceDataSet String aggregation = originAggregations != null && !originAggregations.isEmpty() - ? originAggregations.get(i) : null; + ? originAggregations.get(i) + : null; - Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, - aggregation); + Pair<List<TSDataType>, List<TSDataType>> pair = + getSeriesTypes(actualPaths, aggregation); List<TSDataType> columnDataTypes = pair.left; List<TSDataType> measurementDataTypes = pair.right; for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) { PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes()); // check datatype consistency - // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by device, + // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by + // device, // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT. String measurementChecked; if (originAggregations != null && !originAggregations.isEmpty()) { @@ -517,7 +541,7 @@ public class PhysicalGenerator { } catch (MetadataException e) { throw new LogicalOptimizeException( String.format( - "Error when getting all paths of a full path: %s", fullPath.getFullPath()) + "Error when getting all paths of a full path: %s", fullPath.getFullPath()) + e.getMessage()); } } @@ -539,13 +563,6 @@ public class PhysicalGenerator { measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset); } - int maxDeduplicatedPathNum = QueryResourceManager.getInstance() - .getMaxDeduplicatedPathNum(fetchSize); - - if (measurements.size() > maxDeduplicatedPathNum) { - throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size()); - } - // assigns to alignByDevicePlan alignByDevicePlan.setMeasurements(measurements); alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap); @@ -591,7 +608,7 @@ public class PhysicalGenerator { } } try { - deduplicate(queryPlan, fetchSize); + deduplicate(queryPlan); } catch (MetadataException e) { throw new QueryProcessException(e); } @@ -621,8 +638,8 @@ public class PhysicalGenerator { for (int i = 0; i < filterPathList.size(); i++) { pathTSDataTypeHashMap.put(filterPathList.get(i), seriesTypes.get(i)); } - deviceToFilterMap - .put(device.getFullPath(), newOperator.transformToExpression(pathTSDataTypeHashMap)); + deviceToFilterMap.put( + device.getFullPath(), newOperator.transformToExpression(pathTSDataTypeHashMap)); filterPaths.clear(); } catch (MetadataException e) { throw new QueryProcessException(e); @@ -648,8 +665,8 @@ public class PhysicalGenerator { return retDevices; } - private void concatFilterPath(PartialPath prefix, FilterOperator operator, - Set<PartialPath> filterPaths) { + private void concatFilterPath( + PartialPath prefix, FilterOperator operator, Set<PartialPath> filterPaths) { if (!operator.isLeaf()) { for (FilterOperator child : operator.getChildren()) { concatFilterPath(prefix, child, filterPaths); @@ -660,8 +677,8 @@ public class PhysicalGenerator { PartialPath filterPath = basicOperator.getSinglePath(); // do nothing in the cases of "where time > 5" or "where root.d1.s1 > 5" - if (SQLConstant.isReservedPath(filterPath) || filterPath.getFirstNode() - .startsWith(SQLConstant.ROOT)) { + if (SQLConstant.isReservedPath(filterPath) + || filterPath.getFirstNode().startsWith(SQLConstant.ROOT)) { filterPaths.add(filterPath); return; } @@ -672,7 +689,7 @@ public class PhysicalGenerator { } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private void deduplicate(QueryPlan queryPlan, int fetchSize) + private void deduplicate(QueryPlan queryPlan) throws MetadataException, PathNumOverLimitException { // generate dataType first List<PartialPath> paths = queryPlan.getPaths(); @@ -684,18 +701,6 @@ public class PhysicalGenerator { return; } - if (queryPlan instanceof GroupByTimePlan) { - GroupByTimePlan plan = (GroupByTimePlan) queryPlan; - // the actual row number of group by query should be calculated from startTime, endTime and interval. - long interval = (plan.getEndTime() - plan.getStartTime()) / plan.getInterval(); - if (interval > 0) { - fetchSize = Math.min((int) (interval), fetchSize); - } - } else if (queryPlan instanceof AggregationPlan) { - // the actual row number of aggregation query is 1 - fetchSize = 1; - } - RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan; Set<String> columnSet = new HashSet<>(); // if it's a last query, no need to sort by device @@ -724,15 +729,13 @@ public class PhysicalGenerator { } indexedPaths.sort(Comparator.comparing(pair -> pair.left)); - int maxDeduplicatedPathNum = QueryResourceManager.getInstance() - .getMaxDeduplicatedPathNum(fetchSize); - int deduplicatedPathNum = 0; int index = 0; for (Pair<PartialPath, Integer> indexedPath : indexedPaths) { String column = indexedPath.left.getTsAlias(); if (column == null) { column = - indexedPath.left.getMeasurementAlias() != null ? indexedPath.left.getFullPathWithAlias() + indexedPath.left.getMeasurementAlias() != null + ? indexedPath.left.getFullPathWithAlias() : indexedPath.left.toString(); if (queryPlan instanceof AggregationPlan) { column = queryPlan.getAggregations().get(indexedPath.right) + "(" + column + ")"; @@ -742,10 +745,6 @@ public class PhysicalGenerator { TSDataType seriesType = dataTypes.get(indexedPath.right); rawDataQueryPlan.addDeduplicatedPaths(indexedPath.left); rawDataQueryPlan.addDeduplicatedDataTypes(seriesType); - deduplicatedPathNum++; - if (deduplicatedPathNum > maxDeduplicatedPathNum) { - throw new PathNumOverLimitException(maxDeduplicatedPathNum, deduplicatedPathNum); - } columnSet.add(column); rawDataQueryPlan.addPathToIndex(column, index++); if (queryPlan instanceof AggregationPlan) { @@ -762,9 +761,10 @@ public class PhysicalGenerator { // check parameter range if (seriesOffset >= size) { - throw new QueryProcessException(String.format( - "The value of SOFFSET (%d) is equal to or exceeds the number of sequences (%d) that can actually be returned.", - seriesOffset, size)); + throw new QueryProcessException( + String.format( + "The value of SOFFSET (%d) is equal to or exceeds the number of sequences (%d) that can actually be returned.", + seriesOffset, size)); } int endPosition = seriesOffset + seriesLimit; if (endPosition > size) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java index 3b84a5f..69bb9f7 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java @@ -18,10 +18,6 @@ */ package org.apache.iotdb.db.qp.strategy.optimizer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.PathNumOverLimitException; @@ -38,21 +34,27 @@ import org.apache.iotdb.db.qp.logical.crud.SFWOperator; import org.apache.iotdb.db.qp.logical.crud.SelectOperator; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.utils.Pair; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * concat paths in select and from clause. - */ +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** concat paths in select and from clause. */ public class ConcatPathOptimizer implements ILogicalOptimizer { private static final Logger logger = LoggerFactory.getLogger(ConcatPathOptimizer.class); - private static final String WARNING_NO_SUFFIX_PATHS = "given SFWOperator doesn't have suffix paths, cannot concat seriesPath"; - private static final String WARNING_NO_PREFIX_PATHS = "given SFWOperator doesn't have prefix paths, cannot concat seriesPath"; + private static final String WARNING_NO_SUFFIX_PATHS = + "given SFWOperator doesn't have suffix paths, cannot concat seriesPath"; + private static final String WARNING_NO_PREFIX_PATHS = + "given SFWOperator doesn't have prefix paths, cannot concat seriesPath"; @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override - public Operator transform(Operator operator, int maxDeduplicatedPathNum) + public Operator transform(Operator operator) throws LogicalOptimizeException, PathNumOverLimitException { if (!(operator instanceof SFWOperator)) { logger.warn("given operator isn't SFWOperator, cannot concat seriesPath"); @@ -93,7 +95,7 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { // concat paths and remove stars int seriesLimit = ((QueryOperator) operator).getSeriesLimit(); int seriesOffset = ((QueryOperator) operator).getSeriesOffset(); - concatSelect(prefixPaths, select, seriesLimit, seriesOffset, maxDeduplicatedPathNum); + concatSelect(prefixPaths, select, seriesLimit, seriesOffset); } else { isAlignByDevice = true; for (PartialPath path : initialSuffixPaths) { @@ -120,7 +122,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter, filterPaths)); } sfwOperator.getFilterOperator().setPathSet(filterPaths); - // GROUP_BY_DEVICE leaves the concatFilter to PhysicalGenerator to optimize filter without prefix first + // GROUP_BY_DEVICE leaves the concatFilter to PhysicalGenerator to optimize filter without + // prefix first return sfwOperator; } @@ -158,8 +161,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { * Extract paths from select&from cql, expand them into complete versions, and reassign them to * selectOperator's suffixPathList. Treat aggregations similarly. */ - private void concatSelect(List<PartialPath> fromPaths, SelectOperator selectOperator, int limit, - int offset, int maxDeduplicatedPathNum) + private void concatSelect( + List<PartialPath> fromPaths, SelectOperator selectOperator, int limit, int offset) throws LogicalOptimizeException, PathNumOverLimitException { List<PartialPath> suffixPaths = judgeSelectOperator(selectOperator); @@ -180,12 +183,12 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { } } - removeStarsInPath(allPaths, afterConcatAggregations, selectOperator, limit, offset, - maxDeduplicatedPathNum); + removeStarsInPath(allPaths, afterConcatAggregations, selectOperator, limit, offset); } - private FilterOperator concatFilter(List<PartialPath> fromPaths, FilterOperator operator, - Set<PartialPath> filterPaths) throws LogicalOptimizeException { + private FilterOperator concatFilter( + List<PartialPath> fromPaths, FilterOperator operator, Set<PartialPath> filterPaths) + throws LogicalOptimizeException { if (!operator.isLeaf()) { List<FilterOperator> newFilterList = new ArrayList<>(); for (FilterOperator child : operator.getChildren()) { @@ -197,8 +200,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { FunctionOperator functionOperator = (FunctionOperator) operator; PartialPath filterPath = functionOperator.getSinglePath(); // do nothing in the cases of "where time > 5" or "where root.d1.s1 > 5" - if (SQLConstant.isReservedPath(filterPath) || filterPath.getFirstNode() - .startsWith(SQLConstant.ROOT)) { + if (SQLConstant.isReservedPath(filterPath) + || filterPath.getFirstNode().startsWith(SQLConstant.ROOT)) { filterPaths.add(filterPath); return operator; } @@ -221,8 +224,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { } } - private FilterOperator constructBinaryFilterTreeWithAnd(List<PartialPath> noStarPaths, - FilterOperator operator) throws LogicalOptimizeException { + private FilterOperator constructBinaryFilterTreeWithAnd( + List<PartialPath> noStarPaths, FilterOperator operator) throws LogicalOptimizeException { FilterOperator filterBinaryTree = new FilterOperator(SQLConstant.KW_AND); FilterOperator currentNode = filterBinaryTree; for (int i = 0; i < noStarPaths.size(); i++) { @@ -233,7 +236,9 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { } try { currentNode.addChildOperator( - new BasicFunctionOperator(operator.getTokenIntType(), noStarPaths.get(i), + new BasicFunctionOperator( + operator.getTokenIntType(), + noStarPaths.get(i), ((BasicFunctionOperator) operator).getValue())); } catch (SQLParserException e) { throw new LogicalOptimizeException(e.getMessage()); @@ -273,12 +278,15 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private void removeStarsInPath(List<PartialPath> paths, List<String> afterConcatAggregations, - SelectOperator selectOperator, int finalLimit, int finalOffset, int maxDeduplicatedPathNum) + private void removeStarsInPath( + List<PartialPath> paths, + List<String> afterConcatAggregations, + SelectOperator selectOperator, + int finalLimit, + int finalOffset) throws LogicalOptimizeException, PathNumOverLimitException { int offset = finalOffset; - int limit = finalLimit == 0 || maxDeduplicatedPathNum < finalLimit - ? maxDeduplicatedPathNum + 1 : finalLimit; + int limit = finalLimit == 0 ? 10000 : finalLimit; int consumed = 0; List<PartialPath> retPaths = new ArrayList<>(); List<String> newAggregations = new ArrayList<>(); @@ -293,7 +301,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { actualPaths.get(0).setTsAlias(paths.get(i).getTsAlias()); } else if (actualPaths.size() >= 2) { throw new LogicalOptimizeException( - "alias '" + paths.get(i).getTsAlias() + "alias '" + + paths.get(i).getTsAlias() + "' can only be matched with one time series"); } } @@ -313,9 +322,6 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { limit -= pair.right; } if (limit == 0) { - if (retPaths.size() == maxDeduplicatedPathNum + 1) { - throw new PathNumOverLimitException(maxDeduplicatedPathNum); - } break; } } catch (MetadataException e) { @@ -324,9 +330,10 @@ public class ConcatPathOptimizer implements ILogicalOptimizer { } if (consumed == 0 ? finalOffset != 0 : retPaths.isEmpty()) { - throw new LogicalOptimizeException(String.format( - "The value of SOFFSET (%d) is equal to or exceeds the number of sequences (%d) that can actually be returned.", - finalOffset, consumed)); + throw new LogicalOptimizeException( + String.format( + "The value of SOFFSET (%d) is equal to or exceeds the number of sequences (%d) that can actually be returned.", + finalOffset, consumed)); } selectOperator.setSuffixPathList(retPaths); selectOperator.setAggregations(newAggregations); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java index 4467a8a..e96019d 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ILogicalOptimizer.java @@ -22,12 +22,9 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.PathNumOverLimitException; import org.apache.iotdb.db.qp.logical.Operator; -/** - * provide a context, transform it for optimization. - */ +/** provide a context, transform it for optimization. */ @FunctionalInterface public interface ILogicalOptimizer { - Operator transform(Operator operator, int maxDeduplicatedPathNum) - throws LogicalOptimizeException, PathNumOverLimitException; + Operator transform(Operator operator) throws LogicalOptimizeException, PathNumOverLimitException; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java index e4491cd..13811cf 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java @@ -18,16 +18,6 @@ */ package org.apache.iotdb.db.query.control; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -41,17 +31,25 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.externalsort.serialize.IExternalSortFileDeserializer; import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; /** - * <p> * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to * the jobs. During the life cycle of a query, the following methods must be called in strict order: * 1. assignQueryId - get an Id for the new query. 2. getQueryDataSource - open files for the job or * reuse existing readers. 3. endQueryForGivenJob - release the resource used by this job. - * </p> */ public class QueryResourceManager { @@ -69,55 +67,25 @@ public class QueryResourceManager { /** * Record temporary files used for external sorting. - * <p> - * Key: query job id. Value: temporary file list used for external sorting. + * + * <p>Key: query job id. Value: temporary file list used for external sorting. */ private final Map<Long, List<IExternalSortFileDeserializer>> externalSortFileMap; - private final Map<Long, Long> queryIdEstimatedMemoryMap; - - // current total free memory for reading process(not including the cache memory) - private final AtomicLong totalFreeMemoryForRead; - - // estimated size for one point memory size, the unit is byte - private static final long POINT_ESTIMATED_SIZE = 16L; - - private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private QueryResourceManager() { filePathsManager = new QueryFileManager(); externalSortFileMap = new ConcurrentHashMap<>(); - queryIdEstimatedMemoryMap = new ConcurrentHashMap<>(); - totalFreeMemoryForRead = new AtomicLong( - IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForReadWithoutCache()); } public static QueryResourceManager getInstance() { return QueryTokenManagerHelper.INSTANCE; } - public int getMaxDeduplicatedPathNum(int fetchSize) { - return Math.min((int) ((totalFreeMemoryForRead.get() / fetchSize) / POINT_ESTIMATED_SIZE), - CONFIG.getMaxQueryDeduplicatedPathNum()); - } - - /** - * Register a new query. When a query request is created firstly, this method must be invoked. - */ - public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) { + /** Register a new query. When a query request is created firstly, this method must be invoked. */ + public long assignQueryId(boolean isDataQuery) { long queryId = queryIdAtom.incrementAndGet(); if (isDataQuery) { filePathsManager.addQueryId(queryId); - if (deduplicatedPathNum > 0) { - long estimatedMemoryUsage = - (long) deduplicatedPathNum * POINT_ESTIMATED_SIZE * (long) fetchSize; - // apply the memory successfully - if (totalFreeMemoryForRead.addAndGet(-estimatedMemoryUsage) >= 0) { - queryIdEstimatedMemoryMap.put(queryId, estimatedMemoryUsage); - } else { - totalFreeMemoryForRead.addAndGet(estimatedMemoryUsage); - } - } } return queryId; } @@ -133,27 +101,30 @@ public class QueryResourceManager { /** * register temporary file generated by external sort for resource release. * - * @param queryId query job id + * @param queryId query job id * @param deserializer deserializer of temporary file in external sort. */ - public void registerTempExternalSortFile(long queryId, - IExternalSortFileDeserializer deserializer) { + public void registerTempExternalSortFile( + long queryId, IExternalSortFileDeserializer deserializer) { externalSortFileMap.computeIfAbsent(queryId, x -> new ArrayList<>()).add(deserializer); } - public QueryDataSource getQueryDataSource(PartialPath selectedPath, - QueryContext context, Filter filter) throws StorageEngineException, QueryProcessException { + public QueryDataSource getQueryDataSource( + PartialPath selectedPath, QueryContext context, Filter filter) + throws StorageEngineException, QueryProcessException { - SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, - filter); + SingleSeriesExpression singleSeriesExpression = + new SingleSeriesExpression(selectedPath, filter); QueryDataSource queryDataSource; - queryDataSource = StorageEngine.getInstance() - .query(singleSeriesExpression, context, filePathsManager); + queryDataSource = + StorageEngine.getInstance().query(singleSeriesExpression, context, filePathsManager); // calculate the distinct number of seq and unseq tsfiles if (config.isEnablePerformanceTracing()) { - seqFileNumMap.computeIfAbsent(context.getQueryId(), k -> new HashSet<>()) + seqFileNumMap + .computeIfAbsent(context.getQueryId(), k -> new HashSet<>()) .addAll((queryDataSource.getSeqResources())); - unseqFileNumMap.computeIfAbsent(context.getQueryId(), k -> new HashSet<>()) + unseqFileNumMap + .computeIfAbsent(context.getQueryId(), k -> new HashSet<>()) .addAll((queryDataSource.getUnseqResources())); } return queryDataSource; @@ -169,8 +140,9 @@ public class QueryResourceManager { if (config.isEnablePerformanceTracing()) { boolean isprinted = false; if (seqFileNumMap.get(queryId) != null && unseqFileNumMap.get(queryId) != null) { - TracingManager.getInstance().writeTsFileInfo(queryId, seqFileNumMap.remove(queryId), - unseqFileNumMap.remove(queryId)); + TracingManager.getInstance() + .writeTsFileInfo( + queryId, seqFileNumMap.remove(queryId), unseqFileNumMap.remove(queryId)); isprinted = true; } if (chunkNumMap.get(queryId) != null && chunkSizeMap.get(queryId) != null) { @@ -184,7 +156,8 @@ public class QueryResourceManager { } catch (IOException e) { logger.error( "Error while writing performance info to {}, {}", - config.getTracingDir() + File.separator + IoTDBConstant.TRACING_LOG, e.getMessage()); + config.getTracingDir() + File.separator + IoTDBConstant.TRACING_LOG, + e.getMessage()); } // close file stream of external sort files, and delete @@ -199,12 +172,6 @@ public class QueryResourceManager { externalSortFileMap.remove(queryId); } - // put back the memory usage - Long estimatedMemoryUsage = queryIdEstimatedMemoryMap.remove(queryId); - if (estimatedMemoryUsage != null) { - totalFreeMemoryForRead.addAndGet(estimatedMemoryUsage); - } - // remove usage of opened file paths of current thread filePathsManager.removeUsedFilesForQuery(queryId); } @@ -213,7 +180,6 @@ public class QueryResourceManager { private static final QueryResourceManager INSTANCE = new QueryResourceManager(); - private QueryTokenManagerHelper() { - } + private QueryTokenManagerHelper() {} } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index da62967..f3bc3e5 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -18,28 +18,6 @@ */ package org.apache.iotdb.db.service; -import static org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.sql.SQLException; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import org.antlr.v4.runtime.misc.ParseCancellationException; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer; @@ -71,13 +49,11 @@ import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; -import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; -import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; @@ -137,19 +113,40 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +import org.antlr.v4.runtime.misc.ParseCancellationException; import org.apache.thrift.TException; import org.apache.thrift.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; -/** - * Thrift RPC implementation at server side. - */ +import static org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType.TIMESERIES; + +/** Thrift RPC implementation at server side. */ public class TSServiceImpl implements TSIService.Iface, ServerContext { - private static final Logger auditLogger = LoggerFactory - .getLogger(IoTDBConstant.AUDIT_LOGGER_NAME); + private static final Logger auditLogger = + LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME); private static final Logger logger = LoggerFactory.getLogger(TSServiceImpl.class); private static final Logger SLOW_SQL_LOGGER = LoggerFactory.getLogger("SLOW_SQL"); private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY"); @@ -189,25 +186,26 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // When the client abnormally exits, we can still know who to disconnect private ThreadLocal<Long> currSessionId = new ThreadLocal<>(); - public static final TSProtocolVersion CURRENT_RPC_VERSION = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; - + public static final TSProtocolVersion CURRENT_RPC_VERSION = + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; private static final AtomicInteger queryCount = new AtomicInteger(0); - public TSServiceImpl() throws QueryProcessException { processor = new Planner(); executor = new PlanExecutor(); - ScheduledExecutorService timedQuerySqlCountThread = Executors - .newSingleThreadScheduledExecutor(r -> new Thread(r, "timedQuerySqlCountThread")); - timedQuerySqlCountThread.scheduleAtFixedRate(() -> { + ScheduledExecutorService timedQuerySqlCountThread = + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "timedQuerySqlCountThread")); + timedQuerySqlCountThread.scheduleAtFixedRate( + () -> { if (queryCount.get() != 0) { - QUERY_FREQUENCY_LOGGER - .info("Query count in current 1 minute: " + queryCount.getAndSet(0)); + QUERY_FREQUENCY_LOGGER.info( + "Query count in current 1 minute: " + queryCount.getAndSet(0)); } }, - config.getFrequencyIntervalInMinute(), config.getFrequencyIntervalInMinute(), + config.getFrequencyIntervalInMinute(), + config.getFrequencyIntervalInMinute(), TimeUnit.MINUTES); } @@ -236,11 +234,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { TSStatus tsStatus; long sessionId = -1; if (status) { - //check the version compatibility + // check the version compatibility boolean compatible = checkCompatibility(req.getClient_protocol()); if (!compatible) { - tsStatus = RpcUtils.getStatus(TSStatusCode.INCOMPATIBLE_VERSION, - "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION); + tsStatus = + RpcUtils.getStatus( + TSStatusCode.INCOMPATIBLE_VERSION, + "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION); TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); resp.setSessionId(sessionId); return resp; @@ -253,16 +253,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { currSessionId.set(sessionId); auditLogger.info("User {} opens Session-{}", req.getUsername(), sessionId); logger.info( - "{}: Login status: {}. User : {}", IoTDBConstant.GLOBAL_DB_NAME, tsStatus.message, + "{}: Login status: {}. User : {}", + IoTDBConstant.GLOBAL_DB_NAME, + tsStatus.message, req.getUsername()); } else { - tsStatus = RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, - loginMessage != null ? loginMessage : "Authentication failed."); - auditLogger - .info("User {} opens Session failed with an incorrect password", req.getUsername()); + tsStatus = + RpcUtils.getStatus( + TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR, + loginMessage != null ? loginMessage : "Authentication failed."); + auditLogger.info( + "User {} opens Session failed with an incorrect password", req.getUsername()); } - TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, - CURRENT_RPC_VERSION); + TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); resp.setSessionId(sessionId); return resp; } @@ -321,7 +324,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSStatus closeOperation(TSCloseOperationReq req) { if (auditLogger.isDebugEnabled()) { - auditLogger.debug("{}: receive close operation from Session {}", IoTDBConstant.GLOBAL_DB_NAME, + auditLogger.debug( + "{}: receive close operation from Session {}", + IoTDBConstant.GLOBAL_DB_NAME, currSessionId.get()); } if (!checkLogin(req.getSessionId())) { @@ -359,9 +364,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } - /** - * release single operation resource - */ + /** release single operation resource */ protected void releaseQueryResource(long queryId) throws StorageEngineException { // remove the corresponding Physical Plan queryId2DataSet.remove(queryId); @@ -391,9 +394,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { break; case "ALL_COLUMNS": resp.setColumnsList( - getPaths(new PartialPath(req.getColumnPath())).stream().map(PartialPath::getFullPath) - .collect( - Collectors.toList())); + getPaths(new PartialPath(req.getColumnPath())).stream() + .map(PartialPath::getFullPath) + .collect(Collectors.toList())); status = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); break; default: @@ -440,8 +443,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { for (String statement : statements) { long t2 = System.currentTimeMillis(); isAllSuccessful = - executeStatementInBatch(statement, result, req.getSessionId()) - && isAllSuccessful; + executeStatementInBatch(statement, result, req.getSessionId()) && isAllSuccessful; Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2); } if (isAllSuccessful) { @@ -452,8 +454,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } catch (Exception e) { logger.error(SERVER_INTERNAL_ERROR, IoTDBConstant.GLOBAL_DB_NAME, e); - return RpcUtils - .getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1); } @@ -463,8 +464,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // on finding queries in a batch, such query will be ignored and an error will be generated private boolean executeStatementInBatch(String statement, List<TSStatus> result, long sessionId) { try { - PhysicalPlan physicalPlan = processor - .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE); + PhysicalPlan physicalPlan = + processor.parseSQLToPhysicalPlan( + statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE); if (physicalPlan.isQuery()) { throw new QueryInBatchStatementException(statement); } @@ -477,32 +479,38 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } catch (ParseCancellationException e) { logger.warn(ERROR_PARSING_SQL, statement + " " + e.getMessage()); - result.add(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, - ERROR_PARSING_SQL + " " + statement + " " + e.getMessage())); + result.add( + RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, + ERROR_PARSING_SQL + " " + statement + " " + e.getMessage())); return false; } catch (SQLParserException e) { logger.error("Error occurred when executing {}, check metadata error: ", statement, e); - result.add(RpcUtils.getStatus( - TSStatusCode.SQL_PARSE_ERROR, - ERROR_PARSING_SQL + " " + statement + " " + e.getMessage())); + result.add( + RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, + ERROR_PARSING_SQL + " " + statement + " " + e.getMessage())); return false; } catch (QueryProcessException e) { logger.info( "Error occurred when executing {}, meet error while parsing SQL to physical plan: {}", - statement, e.getMessage()); - result.add(RpcUtils.getStatus( - TSStatusCode.QUERY_PROCESS_ERROR, "Meet error in query process: " + e.getMessage())); + statement, + e.getMessage()); + result.add( + RpcUtils.getStatus( + TSStatusCode.QUERY_PROCESS_ERROR, "Meet error in query process: " + e.getMessage())); return false; } catch (QueryInBatchStatementException e) { logger.info("Error occurred when executing {}, query statement not allowed: ", statement, e); result.add( - RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, - "query statement not allowed: " + statement)); + RpcUtils.getStatus( + TSStatusCode.QUERY_NOT_ALLOWED, "query statement not allowed: " + statement)); return false; } catch (Exception e) { logger.error(SERVER_INTERNAL_ERROR, IoTDBConstant.GLOBAL_DB_NAME, e); - result.add(RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + e.getMessage())); + result.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + e.getMessage())); } return true; } @@ -516,11 +524,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } String statement = req.getStatement(); - PhysicalPlan physicalPlan = processor - .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()), - req.fetchSize); + PhysicalPlan physicalPlan = + processor.parseSQLToPhysicalPlan( + statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize); if (physicalPlan.isQuery()) { - return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, + return internalExecuteQueryStatement( + statement, + req.statementId, + physicalPlan, req.fetchSize, sessionIdUsernameMap.get(req.getSessionId())); } else { @@ -536,8 +547,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } catch (QueryProcessException e) { logger.info(ERROR_PARSING_SQL, e.getMessage()); return RpcUtils.getTSExecuteStatementResp( - RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR, - "Meet error in query process: " + e.getMessage())); + RpcUtils.getStatus( + TSStatusCode.QUERY_PROCESS_ERROR, "Meet error in query process: " + e.getMessage())); } catch (Exception e) { logger.error(SERVER_INTERNAL_ERROR, IoTDBConstant.GLOBAL_DB_NAME, e); return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); @@ -555,9 +566,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { String statement = req.getStatement(); PhysicalPlan physicalPlan; try { - physicalPlan = processor - .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()), - req.fetchSize); + physicalPlan = + processor.parseSQLToPhysicalPlan( + statement, sessionIdZoneIdMap.get(req.getSessionId()), req.fetchSize); } catch (QueryProcessException | SQLParserException e) { logger.info(ERROR_PARSING_SQL, req.getStatement() + " " + e.getMessage()); return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()); @@ -568,13 +579,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); } - return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize, + return internalExecuteQueryStatement( + statement, + req.statementId, + physicalPlan, + req.fetchSize, sessionIdUsernameMap.get(req.getSessionId())); } catch (ParseCancellationException e) { logger.warn(ERROR_PARSING_SQL, req.getStatement() + " " + e.getMessage()); - return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, - ERROR_PARSING_SQL + e.getMessage()); + return RpcUtils.getTSExecuteStatementResp( + TSStatusCode.SQL_PARSE_ERROR, ERROR_PARSING_SQL + e.getMessage()); } catch (SQLParserException e) { logger.error(CHECK_METADATA_ERROR, e); return RpcUtils.getTSExecuteStatementResp( @@ -596,8 +611,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { PhysicalPlan physicalPlan; try { - physicalPlan = - processor.rawDataQueryReqToPhysicalPlan(req); + physicalPlan = processor.rawDataQueryReqToPhysicalPlan(req); } catch (QueryProcessException | SQLParserException e) { logger.info(ERROR_PARSING_SQL, e.getMessage()); return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()); @@ -608,13 +622,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); } - return internalExecuteQueryStatement("", req.statementId, physicalPlan, req.fetchSize, + return internalExecuteQueryStatement( + "", + req.statementId, + physicalPlan, + req.fetchSize, sessionIdUsernameMap.get(req.getSessionId())); } catch (ParseCancellationException e) { logger.warn(ERROR_PARSING_SQL, e.getMessage()); - return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, - ERROR_PARSING_SQL + e.getMessage()); + return RpcUtils.getTSExecuteStatementResp( + TSStatusCode.SQL_PARSE_ERROR, ERROR_PARSING_SQL + e.getMessage()); } catch (SQLParserException e) { logger.error(CHECK_METADATA_ERROR, e); return RpcUtils.getTSExecuteStatementResp( @@ -628,11 +646,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { /** * @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByTimePlan, some - * AuthorPlan + * AuthorPlan */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private TSExecuteStatementResp internalExecuteQueryStatement(String statement, - long statementId, PhysicalPlan plan, int fetchSize, String username) throws IOException { + private TSExecuteStatementResp internalExecuteQueryStatement( + String statement, long statementId, PhysicalPlan plan, int fetchSize, String username) + throws IOException { queryCount.incrementAndGet(); auditLogger.debug("Session {} execute Query: {}", currSessionId.get(), statement); long startTime = System.currentTimeMillis(); @@ -646,7 +665,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } if (plan instanceof ShowTimeSeriesPlan) { - //If the user does not pass the limit, then set limit = fetchSize and haslimit=false,else set haslimit = true + // If the user does not pass the limit, then set limit = fetchSize and haslimit=false,else + // set haslimit = true if (((ShowTimeSeriesPlan) plan).getLimit() == 0) { ((ShowTimeSeriesPlan) plan).setLimit(fetchSize); ((ShowTimeSeriesPlan) plan).setHasLimit(false); @@ -667,36 +687,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } if (plan.getOperatorType() == OperatorType.AGGREGATION) { resp.setIgnoreTimeStamp(true); - // the actual row number of aggregation query is 1 - fetchSize = 1; } // else default ignoreTimeStamp is false - if (plan instanceof GroupByTimePlan) { - GroupByTimePlan groupByTimePlan = (GroupByTimePlan) plan; - // the actual row number of group by query should be calculated from startTime, endTime and interval. - fetchSize = Math.min( - (int) ((groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime()) / groupByTimePlan - .getInterval()), fetchSize); - } - resp.setOperationType(plan.getOperatorType().toString()); - // get deduplicated path num - int deduplicatedPathNum = -1; - if (plan instanceof AlignByDevicePlan) { - deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size(); - } else if (plan instanceof LastQueryPlan) { - // dataset of last query consists of three column: time column + value column = 1 deduplicatedPathNum - // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum - deduplicatedPathNum = 2; - // last query's actual row number should be the minimum between the number of series and fetchSize - fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize); - } else if (plan instanceof RawDataQueryPlan) { - deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size(); - } - // generate the queryId for the operation - queryId = generateQueryId(true, fetchSize, deduplicatedPathNum); + queryId = generateQueryId(true); if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) { if (!(plan instanceof AlignByDevicePlan)) { TracingManager.getInstance() @@ -707,7 +703,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } // put it into the corresponding Set - statementId2QueryId.computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>()) + statementId2QueryId + .computeIfAbsent(statementId, k -> new CopyOnWriteArraySet<>()) .add(queryId); if (plan instanceof AuthorPlan) { @@ -715,7 +712,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } // create and cache dataset QueryDataSet newDataSet = createQueryDataSet(queryId, plan); - if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime() + if (plan instanceof QueryPlan + && !((QueryPlan) plan).isAlignByTime() && newDataSet instanceof NonAlignEngineDataSet) { TSQueryNonAlignDataSet result = fillRpcNonAlignReturnData(fetchSize, newDataSet, username); resp.setNonAlignQueryDataSet(result); @@ -768,11 +766,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { SLOW_SQL_LOGGER.info("Cost: " + costTime + " ms, sql is " + statement); } if (plan.isDebug()) { - SLOW_SQL_LOGGER.info("ChunkCache used memory proportion: " + ChunkCache.getInstance() - .getUsedMemoryProportion() + "\nChunkMetadataCache used memory proportion: " - + ChunkMetadataCache.getInstance().getUsedMemoryProportion() - + "\nTimeSeriesMetadataCache used memory proportion: " + TimeSeriesMetadataCache - .getInstance().getUsedMemoryProportion()); + SLOW_SQL_LOGGER.info( + "ChunkCache used memory proportion: " + + ChunkCache.getInstance().getUsedMemoryProportion() + + "\nChunkMetadataCache used memory proportion: " + + ChunkMetadataCache.getInstance().getUsedMemoryProportion() + + "\nTimeSeriesMetadataCache used memory proportion: " + + TimeSeriesMetadataCache.getInstance().getUsedMemoryProportion()); } } } @@ -839,14 +839,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return StaticResps.LIST_USER_PRIVILEGE_RESP; default: return RpcUtils.getTSExecuteStatementResp( - RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, + RpcUtils.getStatus( + TSStatusCode.SQL_PARSE_ERROR, String.format("%s is not an auth query", authorPlan.getAuthorType()))); } } - /** - * get ResultSet schema - */ + /** get ResultSet schema */ private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan physicalPlan, String username) throws AuthException, TException, QueryProcessException, MetadataException { @@ -856,12 +855,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { // check permissions if (!checkAuthorization(physicalPlan.getPaths(), physicalPlan, username)) { return RpcUtils.getTSExecuteStatementResp( - RpcUtils.getStatus(TSStatusCode.NO_PERMISSION_ERROR, + RpcUtils.getStatus( + TSStatusCode.NO_PERMISSION_ERROR, "No permissions for this operation " + physicalPlan.getOperatorType())); } - TSExecuteStatementResp resp = RpcUtils - .getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS); + TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS); // align by device query QueryPlan plan = (QueryPlan) physicalPlan; @@ -869,12 +868,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { getAlignByDeviceQueryHeaders((AlignByDevicePlan) plan, respColumns, columnsTypes); } else if (plan instanceof LastQueryPlan) { // Last Query should return different respond instead of the static one - // because the query dataset and query id is different although the header of last query is same. + // because the query dataset and query id is different although the header of last query is + // same. return StaticResps.LAST_RESP.deepCopy(); } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) { - Map<String, Long> finalPaths = FilePathUtils - .getPathByLevel(((AggregationPlan) plan).getDeduplicatedPaths(), - ((AggregationPlan) plan).getLevel(), null); + Map<String, Long> finalPaths = + FilePathUtils.getPathByLevel( + ((AggregationPlan) plan).getDeduplicatedPaths(), + ((AggregationPlan) plan).getLevel(), + null); for (Map.Entry<String, Long> entry : finalPaths.entrySet()) { respColumns.add("count(" + entry.getKey() + ")"); columnsTypes.add(TSDataType.INT64.toString()); @@ -903,8 +905,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { for (PartialPath path : paths) { String column = path.getTsAlias(); if (column == null) { - column = path.getMeasurementAlias() != null ? path.getFullPathWithAlias() - : path.getFullPath(); + column = + path.getMeasurementAlias() != null + ? path.getFullPathWithAlias() + : path.getFullPath(); } respColumns.add(column); seriesTypes.add(getSeriesTypeByPath(path)); @@ -923,9 +927,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { PartialPath path = paths.get(i); String column = path.getTsAlias(); if (column == null) { - column = path.getMeasurementAlias() != null - ? aggregations.get(i) + "(" + paths.get(i).getFullPathWithAlias() + ")" - : aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")"; + column = + path.getMeasurementAlias() != null + ? aggregations.get(i) + "(" + paths.get(i).getFullPathWithAlias() + ")" + : aggregations.get(i) + "(" + paths.get(i).getFullPath() + ")"; } respColumns.add(column); } @@ -1106,12 +1111,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return result; } - /** - * create QueryDataSet and buffer it for fetchResults - */ + /** create QueryDataSet and buffer it for fetchResults */ private QueryDataSet createQueryDataSet(long queryId, PhysicalPlan physicalPlan) throws QueryProcessException, QueryFilterOptimizationException, StorageEngineException, - IOException, MetadataException, SQLException, TException, InterruptedException { + IOException, MetadataException, SQLException, TException, InterruptedException { QueryContext context = genQueryContext(queryId, physicalPlan.isDebug()); QueryDataSet queryDataSet = executor.processQuery(physicalPlan, context); @@ -1147,7 +1150,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { status = executeNonQueryPlan(plan); TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(status); - long queryId = generateQueryId(false, DEFAULT_FETCH_SIZE, -1); + long queryId = generateQueryId(false); resp.setQueryId(queryId); return resp; } @@ -1165,8 +1168,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { PhysicalPlan physicalPlan; try { - physicalPlan = processor - .parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE); + physicalPlan = + processor.parseSQLToPhysicalPlan( + statement, sessionIdZoneIdMap.get(sessionId), DEFAULT_FETCH_SIZE); } catch (QueryProcessException | SQLParserException e) { logger.warn(ERROR_PARSING_SQL, statement, e); return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()); @@ -1254,9 +1258,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSStatus insertRecords(TSInsertRecordsReq req) { if (auditLogger.isDebugEnabled()) { - auditLogger - .debug("Session {} insertRecords, first device {}, first time {}", currSessionId.get(), - req.deviceIds.get(0), req.getTimestamps().get(0)); + auditLogger.debug( + "Session {} insertRecords, first device {}, first time {}", + currSessionId.get(), + req.deviceIds.get(0), + req.getTimestamps().get(0)); } if (!checkLogin(req.getSessionId())) { logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); @@ -1267,10 +1273,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { for (int i = 0; i < req.deviceIds.size(); i++) { try { - InsertRowPlan plan = new InsertRowPlan( - new PartialPath(req.getDeviceIds().get(i)), req.getTimestamps().get(i), - req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i) - ); + InsertRowPlan plan = + new InsertRowPlan( + new PartialPath(req.getDeviceIds().get(i)), + req.getTimestamps().get(i), + req.getMeasurementsList().get(i).toArray(new String[0]), + req.valuesList.get(i)); TSStatus status = checkAuthority(plan, req.getSessionId()); if (status != null) { statusList.add(status); @@ -1289,9 +1297,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) throws TException { if (auditLogger.isDebugEnabled()) { - auditLogger - .debug("Session {} insertRecords, device {}, first time {}", currSessionId.get(), - req.deviceId, req.getTimestamps().get(0)); + auditLogger.debug( + "Session {} insertRecords, device {}, first time {}", + currSessionId.get(), + req.deviceId, + req.getTimestamps().get(0)); } if (!checkLogin(req.getSessionId())) { logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); @@ -1301,12 +1311,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { List<TSStatus> statusList = new ArrayList<>(); try { - InsertRowsOfOneDevicePlan plan = new InsertRowsOfOneDevicePlan( - new PartialPath(req.getDeviceId()), - req.getTimestamps().toArray(new Long[0]), - req.getMeasurementsList(), - req.getValuesList().toArray(new ByteBuffer[0]) - ); + InsertRowsOfOneDevicePlan plan = + new InsertRowsOfOneDevicePlan( + new PartialPath(req.getDeviceId()), + req.getTimestamps().toArray(new Long[0]), + req.getMeasurementsList(), + req.getValuesList().toArray(new ByteBuffer[0])); TSStatus status = checkAuthority(plan, req.getSessionId()); if (status != null) { statusList.add(status); @@ -1324,9 +1334,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSStatus insertStringRecords(TSInsertStringRecordsReq req) throws TException { if (auditLogger.isDebugEnabled()) { - auditLogger - .debug("Session {} insertRecords, first device {}, first time {}", currSessionId.get(), - req.deviceIds.get(0), req.getTimestamps().get(0)); + auditLogger.debug( + "Session {} insertRecords, first device {}, first time {}", + currSessionId.get(), + req.deviceIds.get(0), + req.getTimestamps().get(0)); } if (!checkLogin(req.getSessionId())) { logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); @@ -1390,7 +1402,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } @Override - public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) throws TException { + public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) + throws TException { logger.debug("Test insert rows in batch request receive."); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } @@ -1404,18 +1417,22 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSStatus insertRecord(TSInsertRecordReq req) { try { - auditLogger - .debug("Session {} insertRecord, device {}, time {}", currSessionId.get(), - req.getDeviceId(), req.getTimestamp()); + auditLogger.debug( + "Session {} insertRecord, device {}, time {}", + currSessionId.get(), + req.getDeviceId(), + req.getTimestamp()); if (!checkLogin(req.getSessionId())) { logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR); } - InsertRowPlan plan = new InsertRowPlan( - new PartialPath(req.getDeviceId()), req.getTimestamp(), - req.getMeasurements().toArray(new String[0]), req.values - ); + InsertRowPlan plan = + new InsertRowPlan( + new PartialPath(req.getDeviceId()), + req.getTimestamp(), + req.getMeasurements().toArray(new String[0]), + req.values); TSStatus status = checkAuthority(plan, req.getSessionId()); if (status != null) { @@ -1431,9 +1448,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSStatus insertStringRecord(TSInsertStringRecordReq req) throws TException { try { - auditLogger - .debug("Session {} insertRecord, device {}, time {}", currSessionId.get(), - req.getDeviceId(), req.getTimestamp()); + auditLogger.debug( + "Session {} insertRecord, device {}, time {}", + currSessionId.get(), + req.getDeviceId(), + req.getTimestamp()); if (!checkLogin(req.getSessionId())) { logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME); return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR); @@ -1495,8 +1514,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR); } - InsertTabletPlan insertTabletPlan = new InsertTabletPlan(new PartialPath(req.deviceId), - req.measurements); + InsertTabletPlan insertTabletPlan = + new InsertTabletPlan(new PartialPath(req.deviceId), req.measurements); insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps, req.size)); insertTabletPlan.setColumns( QueryDataSetUtils.readValuesFromBuffer( @@ -1512,8 +1531,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return executeNonQueryPlan(insertTabletPlan); } catch (Exception e) { logger.error("{}: error occurs when executing statements", IoTDBConstant.GLOBAL_DB_NAME, e); - return RpcUtils - .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1); } @@ -1530,14 +1548,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { List<TSStatus> statusList = new ArrayList<>(); for (int i = 0; i < req.deviceIds.size(); i++) { - InsertTabletPlan insertTabletPlan = new InsertTabletPlan( - new PartialPath(req.deviceIds.get(i)), - req.measurementsList.get(i)); + InsertTabletPlan insertTabletPlan = + new InsertTabletPlan( + new PartialPath(req.deviceIds.get(i)), req.measurementsList.get(i)); insertTabletPlan.setTimes( QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i))); insertTabletPlan.setColumns( QueryDataSetUtils.readValuesFromBuffer( - req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(), + req.valuesList.get(i), + req.typesList.get(i), + req.measurementsList.get(i).size(), req.sizeList.get(i))); insertTabletPlan.setRowCount(req.sizeList.get(i)); insertTabletPlan.setDataTypes(req.typesList.get(i)); @@ -1553,8 +1573,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return RpcUtils.getStatus(statusList); } catch (Exception e) { logger.error("{}: error occurs when insertTablets", IoTDBConstant.GLOBAL_DB_NAME, e); - return RpcUtils - .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); + return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()); } finally { Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1); } @@ -1615,10 +1634,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { auditLogger.debug("Session-{} create timeseries {}", currSessionId.get(), req.getPath()); } - CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(new PartialPath(req.path), - TSDataType.values()[req.dataType], TSEncoding.values()[req.encoding], - CompressionType.values()[req.compressor], req.props, req.tags, req.attributes, - req.measurementAlias); + CreateTimeSeriesPlan plan = + new CreateTimeSeriesPlan( + new PartialPath(req.path), + TSDataType.values()[req.dataType], + TSEncoding.values()[req.encoding], + CompressionType.values()[req.compressor], + req.props, + req.tags, + req.attributes, + req.measurementAlias); TSStatus status = checkAuthority(plan, req.getSessionId()); if (status != null) { return status; @@ -1639,8 +1664,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR); } if (auditLogger.isDebugEnabled()) { - auditLogger.debug("Session-{} create {} timeseries, the first is {}", currSessionId.get(), - req.getPaths().size(), req.getPaths().get(0)); + auditLogger.debug( + "Session-{} create {} timeseries, the first is {}", + currSessionId.get(), + req.getPaths().size(), + req.getPaths().get(0)); } List<TSStatus> statusList = new ArrayList<>(req.paths.size()); CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(); @@ -1717,11 +1745,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { if (createMultiTimeSeriesPlan.getResults().entrySet().size() > 0) { isAllSuccessful = false; - for (Map.Entry<Integer, Exception> entry : createMultiTimeSeriesPlan.getResults() - .entrySet()) { - statusList.set(entry.getKey(), - RpcUtils - .getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, entry.getValue().getMessage())); + for (Map.Entry<Integer, Exception> entry : + createMultiTimeSeriesPlan.getResults().entrySet()) { + statusList.set( + entry.getKey(), + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, entry.getValue().getMessage())); } } @@ -1766,7 +1795,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public long requestStatementId(long sessionId) { long statementId = statementIdGenerator.incrementAndGet(); - sessionId2StatementId.computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>()) + sessionId2StatementId + .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>()) .add(statementId); return statementId; } @@ -1808,19 +1838,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR); } - - private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) { - return QueryResourceManager.getInstance() - .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum); + private long generateQueryId(boolean isDataQuery) { + return QueryResourceManager.getInstance().assignQueryId(isDataQuery); } - protected List<TSDataType> getSeriesTypesByPaths(List<PartialPath> paths, - List<String> aggregations) - throws MetadataException { + protected List<TSDataType> getSeriesTypesByPaths( + List<PartialPath> paths, List<String> aggregations) throws MetadataException { return SchemaUtils.getSeriesTypesByPaths(paths, aggregations); } - protected TSDataType getSeriesTypeByPath(PartialPath path) throws MetadataException { return SchemaUtils.getSeriesTypeByPaths(path); } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java index 415fae6..a822949 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSequenceDataQueryIT.java @@ -18,17 +18,6 @@ */ package org.apache.iotdb.db.integration; -import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; -import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.exception.StorageEngineException; @@ -52,10 +41,23 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + /** * Notice that, all test begins with "IoTDB" is integration test. All test which will start the * IoTDB server should be defined as integration test. In this test case, no unseq insert data. @@ -104,8 +106,9 @@ public class IoTDBSequenceDataQueryIT { private static void insertData() throws ClassNotFoundException { Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = DriverManager - .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { // create storage group and measurement @@ -115,17 +118,22 @@ public class IoTDBSequenceDataQueryIT { // insert data (time from 300-999) for (long time = 300; time < 1000; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 17); + String sql = + String.format( + "insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 17); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 29); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 29); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 31); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 31); statement.execute(sql); - sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, - TestConstant.stringValue[(int) time % 5]); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", + time, TestConstant.stringValue[(int) time % 5]); statement.execute(sql); if (time % 17 >= 14) { @@ -139,21 +147,26 @@ public class IoTDBSequenceDataQueryIT { for (long time = 1200; time < 1500; time++) { String sql; if (time % 2 == 0) { - sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 17); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 17); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 29); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 29); statement.execute(sql); if (time % 17 >= 14) { count++; } } - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 31); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 31); statement.execute(sql); - sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, - TestConstant.stringValue[(int) time % 5]); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", + time, TestConstant.stringValue[(int) time % 5]); statement.execute(sql); } @@ -170,23 +183,29 @@ public class IoTDBSequenceDataQueryIT { QueryRouter queryRouter = new QueryRouter(); List<PartialPath> pathList = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); dataTypes.add(TSDataType.INT64); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s2)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s2)); dataTypes.add(TSDataType.FLOAT); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s3)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s3)); dataTypes.add(TSDataType.TEXT); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s4)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s4)); dataTypes.add(TSDataType.BOOLEAN); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); dataTypes.add(TSDataType.INT64); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance() - .assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); RawDataQueryPlan queryPlan = new RawDataQueryPlan(); queryPlan.setDeduplicatedDataTypes(dataTypes); @@ -209,16 +228,18 @@ public class IoTDBSequenceDataQueryIT { QueryRouter queryRouter = new QueryRouter(); List<PartialPath> pathList = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); dataTypes.add(TSDataType.INT64); GlobalTimeExpression globalTimeExpression = new GlobalTimeExpression(TimeFilter.gtEq(800L)); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance() - .assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); RawDataQueryPlan queryPlan = new RawDataQueryPlan(); @@ -247,27 +268,34 @@ public class IoTDBSequenceDataQueryIT { QueryRouter queryRouter = new QueryRouter(); List<PartialPath> pathList = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); dataTypes.add(TSDataType.INT64); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s2)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s2)); dataTypes.add(TSDataType.FLOAT); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s3)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s3)); dataTypes.add(TSDataType.TEXT); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s4)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s4)); dataTypes.add(TSDataType.BOOLEAN); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); dataTypes.add(TSDataType.INT64); - Path queryPath = new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); - SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(queryPath, - ValueFilter.gtEq(14)); + Path queryPath = + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); + SingleSeriesExpression singleSeriesExpression = + new SingleSeriesExpression(queryPath, ValueFilter.gtEq(14)); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance() - .assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); RawDataQueryPlan queryPlan = new RawDataQueryPlan(); @@ -285,5 +313,4 @@ public class IoTDBSequenceDataQueryIT { QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID); } - } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java index ba2a46d..55ec4c8 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java @@ -18,21 +18,6 @@ */ package org.apache.iotdb.db.integration; -import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; -import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.compaction.CompactionStrategy; @@ -54,11 +39,28 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT; +import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + /** * Notice that, all test begins with "IoTDB" is integration test. All test which will start the * IoTDB server should be defined as integration test. @@ -86,9 +88,10 @@ public class IoTDBSeriesReaderIT { tsFileConfig.setMaxNumberOfPointsInPage(1000); tsFileConfig.setPageSizeInByte(1024 * 1024 * 150); tsFileConfig.setGroupSizeInByte(1024 * 1024 * 150); - prevChunkMergePointThreshold = IoTDBDescriptor.getInstance().getConfig() - .getMergeChunkPointNumberThreshold(); - IoTDBDescriptor.getInstance().getConfig() + prevChunkMergePointThreshold = + IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold(); + IoTDBDescriptor.getInstance() + .getConfig() .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(Integer.MAX_VALUE); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16); @@ -100,9 +103,8 @@ public class IoTDBSeriesReaderIT { EnvironmentUtils.envSetUp(); insertData(); - connection = DriverManager - .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); - + connection = + DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); } @AfterClass @@ -114,18 +116,21 @@ public class IoTDBSeriesReaderIT { tsFileConfig.setGroupSizeInByte(groupSizeInByte); EnvironmentUtils.cleanEnv(); - IoTDBDescriptor.getInstance().getConfig() + IoTDBDescriptor.getInstance() + .getConfig() .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte); IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval); - IoTDBDescriptor.getInstance().getConfig() + IoTDBDescriptor.getInstance() + .getConfig() .setMergeChunkPointNumberThreshold(prevChunkMergePointThreshold); } private static void insertData() throws ClassNotFoundException { Class.forName(Config.JDBC_DRIVER_NAME); - try (Connection connection = DriverManager - .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : TestConstant.create_sql) { @@ -134,20 +139,27 @@ public class IoTDBSeriesReaderIT { // insert large amount of data time range : 3000 ~ 13600 for (int time = 3000; time < 13600; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 100); + String sql = + String.format( + "insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 100); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 17); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 17); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 22); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 22); statement.execute(sql); - sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, - TestConstant.stringValue[time % 5]); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", + time, TestConstant.stringValue[time % 5]); statement.execute(sql); - sql = String.format("insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", time, - TestConstant.booleanValue[time % 2]); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", + time, TestConstant.booleanValue[time % 2]); statement.execute(sql); sql = String.format("insert into root.vehicle.d0(timestamp,s5) values(%s, %s)", time, time); statement.execute(sql); @@ -158,14 +170,17 @@ public class IoTDBSeriesReaderIT { // insert large amount of data time range : 13700 ~ 24000 for (int time = 13700; time < 24000; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 70); + String sql = + String.format( + "insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 70); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 40); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 40); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 123); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 123); statement.execute(sql); } @@ -174,28 +189,34 @@ public class IoTDBSeriesReaderIT { // buffwrite data, unsealed file for (int time = 100000; time < 101000; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 20); + String sql = + String.format( + "insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time % 20); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 30); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time % 30); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 77); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time % 77); statement.execute(sql); } // sequential data, memory data for (int time = 200000; time < 201000; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, -time % 20); + String sql = + String.format( + "insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, -time % 20); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, -time % 30); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, -time % 30); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, -time % 77); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, -time % 77); statement.execute(sql); } @@ -203,29 +224,31 @@ public class IoTDBSeriesReaderIT { // unsequence insert, time < 3000 for (int time = 2000; time < 2500; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time); + String sql = + String.format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, time); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time + 1); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, time + 1); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time + 2); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, time + 2); statement.execute(sql); - sql = String.format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, - TestConstant.stringValue[time % 5]); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", + time, TestConstant.stringValue[time % 5]); statement.execute(sql); } for (int time = 100000; time < 100500; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 666); + String sql = + String.format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 666); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, 777); + sql = String.format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, 777); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, 888); + sql = String.format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, 888); statement.execute(sql); } @@ -233,18 +256,21 @@ public class IoTDBSeriesReaderIT { // unsequence insert, time > 200000 for (int time = 200900; time < 201000; time++) { - String sql = String - .format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 6666); + String sql = + String.format("insert into root.vehicle.d0(timestamp,s0) values(%s,%s)", time, 6666); statement.execute(sql); sql = String.format("insert into root.vehicle.d0(timestamp,s1) values(%s,%s)", time, 7777); statement.execute(sql); sql = String.format("insert into root.vehicle.d0(timestamp,s2) values(%s,%s)", time, 8888); statement.execute(sql); - sql = String - .format("insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, "goodman"); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s3) values(%s,'%s')", time, "goodman"); statement.execute(sql); - sql = String.format("insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", time, - TestConstant.booleanValue[time % 2]); + sql = + String.format( + "insert into root.vehicle.d0(timestamp,s4) values(%s, %s)", + time, TestConstant.booleanValue[time % 2]); statement.execute(sql); sql = String.format("insert into root.vehicle.d0(timestamp,s5) values(%s, %s)", time, 9999); statement.execute(sql); @@ -262,25 +288,32 @@ public class IoTDBSeriesReaderIT { QueryRouter queryRouter = new QueryRouter(); List<PartialPath> pathList = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); dataTypes.add(TSDataType.INT64); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s2)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s2)); dataTypes.add(TSDataType.FLOAT); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s3)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s3)); dataTypes.add(TSDataType.TEXT); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s4)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s4)); dataTypes.add(TSDataType.BOOLEAN); - pathList.add(new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s5)); + pathList.add( + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s5)); dataTypes.add(TSDataType.DOUBLE); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0)); dataTypes.add(TSDataType.INT32); - pathList.add(new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); + pathList.add( + new PartialPath(TestConstant.d1 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1)); dataTypes.add(TSDataType.INT64); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance() - .assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); RawDataQueryPlan queryPlan = new RawDataQueryPlan(); @@ -304,14 +337,14 @@ public class IoTDBSeriesReaderIT { QueryRouter queryRouter = new QueryRouter(); List<PartialPath> pathList = new ArrayList<>(); List<TSDataType> dataTypes = new ArrayList<>(); - PartialPath p = new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); + PartialPath p = + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); pathList.add(p); dataTypes.add(TSDataType.INT32); - SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(p, - ValueFilter.gtEq(20)); + SingleSeriesExpression singleSeriesExpression = + new SingleSeriesExpression(p, ValueFilter.gtEq(20)); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance() - .assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); RawDataQueryPlan queryPlan = new RawDataQueryPlan(); @@ -334,11 +367,12 @@ public class IoTDBSeriesReaderIT { public void seriesTimeDigestReadTest() throws IOException, StorageEngineException, QueryProcessException, IllegalPathException { QueryRouter queryRouter = new QueryRouter(); - PartialPath path = new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); + PartialPath path = + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); List<TSDataType> dataTypes = Collections.singletonList(TSDataType.INT32); SingleSeriesExpression expression = new SingleSeriesExpression(path, TimeFilter.gt(22987L)); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true, 1024, 1); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); RawDataQueryPlan queryPlan = new RawDataQueryPlan(); @@ -361,8 +395,10 @@ public class IoTDBSeriesReaderIT { public void crossSeriesReadUpdateTest() throws IOException, StorageEngineException, QueryProcessException, IllegalPathException { QueryRouter queryRouter = new QueryRouter(); - PartialPath path1 = new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); - PartialPath path2 = new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1); + PartialPath path1 = + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s0); + PartialPath path2 = + new PartialPath(TestConstant.d0 + TsFileConstant.PATH_SEPARATOR + TestConstant.s1); RawDataQueryPlan queryPlan = new RawDataQueryPlan(); @@ -376,12 +412,11 @@ public class IoTDBSeriesReaderIT { dataTypes.add(TSDataType.INT64); queryPlan.setDeduplicatedDataTypes(dataTypes); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance() - .assignQueryId(true, 1024, pathList.size()); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); - SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(path1, - ValueFilter.lt(111)); + SingleSeriesExpression singleSeriesExpression = + new SingleSeriesExpression(path1, ValueFilter.lt(111)); queryPlan.setExpression(singleSeriesExpression); QueryDataSet queryDataSet = queryRouter.rawDataQuery(queryPlan, TEST_QUERY_CONTEXT); @@ -400,8 +435,8 @@ public class IoTDBSeriesReaderIT { @Test public void queryEmptySeriesTest() throws SQLException { Statement statement = connection.createStatement(); - statement - .execute("CREATE TIMESERIES root.vehicle.d_empty.s1 WITH DATATYPE=INT64, ENCODING=RLE"); + statement.execute( + "CREATE TIMESERIES root.vehicle.d_empty.s1 WITH DATATYPE=INT64, ENCODING=RLE"); ResultSet resultSet = statement.executeQuery("select * from root.vehicle.d_empty"); try { assertFalse(resultSet.next()); @@ -414,8 +449,8 @@ public class IoTDBSeriesReaderIT { @Test public void queryWithLongRangeUnSeqTest() throws SQLException { try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { // make up data final String INSERT_TEMPLATE = "insert into root.sg.d1(time, s1) values(%d, %d)"; diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java index f048a28..b114fa3 100644 --- a/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java +++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java @@ -18,9 +18,6 @@ */ package org.apache.iotdb.db.qp.plan; -import java.time.ZoneId; -import java.util.ArrayList; -import org.antlr.v4.runtime.misc.ParseCancellationException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -35,10 +32,15 @@ import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator; import org.apache.iotdb.db.qp.strategy.LogicalGenerator; import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer; import org.apache.iotdb.db.service.IoTDB; + +import org.antlr.v4.runtime.misc.ParseCancellationException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.time.ZoneId; +import java.util.ArrayList; + public class LogicalPlanSmallTest { private LogicalGenerator logicalGenerator; @@ -51,8 +53,8 @@ public class LogicalPlanSmallTest { @Test public void testLimit() { String sqlStr = "select * from root.vehicle.d1 limit 10"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); Assert.assertEquals(10, ((QueryOperator) operator).getRowLimit()); Assert.assertEquals(0, ((QueryOperator) operator).getRowOffset()); @@ -63,8 +65,8 @@ public class LogicalPlanSmallTest { @Test public void testOffset() { String sqlStr = "select * from root.vehicle.d1 limit 10 offset 20"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); Assert.assertEquals(10, ((QueryOperator) operator).getRowLimit()); Assert.assertEquals(20, ((QueryOperator) operator).getRowOffset()); @@ -75,8 +77,8 @@ public class LogicalPlanSmallTest { @Test public void testSlimit() { String sqlStr = "select * from root.vehicle.d1 limit 10 slimit 1"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); Assert.assertEquals(10, ((QueryOperator) operator).getRowLimit()); Assert.assertEquals(0, ((QueryOperator) operator).getRowOffset()); @@ -86,9 +88,10 @@ public class LogicalPlanSmallTest { @Test public void testSOffset() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() limit 50 slimit 10 soffset 100"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and time <= now() limit 50 slimit 10 soffset 100"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); Assert.assertEquals(50, ((QueryOperator) operator).getRowLimit()); Assert.assertEquals(0, ((QueryOperator) operator).getRowOffset()); @@ -98,9 +101,10 @@ public class LogicalPlanSmallTest { @Test public void testSOffsetTimestamp() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and timestamp <= now() limit 50 slimit 10 soffset 100"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and timestamp <= now() limit 50 slimit 10 soffset 100"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); Assert.assertEquals(50, ((QueryOperator) operator).getRowLimit()); Assert.assertEquals(0, ((QueryOperator) operator).getRowOffset()); @@ -110,88 +114,98 @@ public class LogicalPlanSmallTest { @Test(expected = SQLParserException.class) public void testLimitOutOfRange() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() limit 1111111111111111111111"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and time <= now() limit 1111111111111111111111"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); // expected to throw SQLParserException: Out of range. LIMIT <N>: N should be Int32. } @Test(expected = SQLParserException.class) public void testLimitNotPositive() { String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() limit 0"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); // expected to throw SQLParserException: LIMIT <N>: N should be greater than 0. } @Test(expected = SQLParserException.class) public void testOffsetOutOfRange() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() " - + "limit 1 offset 1111111111111111111111"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); - // expected to throw SQLParserException: Out of range. OFFSET <OFFSETValue>: OFFSETValue should be Int32. + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and time <= now() " + + "limit 1 offset 1111111111111111111111"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); + // expected to throw SQLParserException: Out of range. OFFSET <OFFSETValue>: OFFSETValue should + // be Int32. } @Test(expected = ParseCancellationException.class) public void testOffsetNotPositive() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() limit 1 offset -1"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and time <= now() limit 1 offset -1"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); // expected to throw SQLParserException: OFFSET <OFFSETValue>: OFFSETValue should >= 0. } @Test(expected = SQLParserException.class) public void testSlimitOutOfRange() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() slimit 1111111111111111111111"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and time <= now() slimit 1111111111111111111111"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); // expected to throw SQLParserException: Out of range. SLIMIT <SN>: SN should be Int32. } @Test(expected = SQLParserException.class) public void testSlimitNotPositive() { String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() slimit 0"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); // expected to throw SQLParserException: SLIMIT <SN>: SN should be greater than 0. } @Test(expected = SQLParserException.class) public void testSoffsetOutOfRange() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() " - + "slimit 1 soffset 1111111111111111111111"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); - // expected to throw SQLParserException: Out of range. SOFFSET <SOFFSETValue>: SOFFSETValue should be Int32. + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and time <= now() " + + "slimit 1 soffset 1111111111111111111111"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); + // expected to throw SQLParserException: Out of range. SOFFSET <SOFFSETValue>: SOFFSETValue + // should be Int32. } @Test public void testSoffsetNotPositive() { - String sqlStr = "select * from root.vehicle.d1 where s1 < 20 and time <= now() slimit 1 soffset 1"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + String sqlStr = + "select * from root.vehicle.d1 where s1 < 20 and time <= now() slimit 1 soffset 1"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(1, ((QueryOperator) operator).getSeriesOffset()); Assert.assertEquals(1, ((QueryOperator) operator).getSeriesLimit()); } @Test(expected = LogicalOptimizeException.class) public void testSoffsetExceedColumnNum() throws QueryProcessException { - String sqlStr = "select s1 from root.vehicle.d1 where s1 < 20 and time <= now() slimit 2 soffset 1"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + String sqlStr = + "select s1 from root.vehicle.d1 where s1 < 20 and time <= now() slimit 2 soffset 1"; + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); IoTDB.metaManager.init(); ConcatPathOptimizer concatPathOptimizer = new ConcatPathOptimizer(); - concatPathOptimizer.transform(operator, 1000); + concatPathOptimizer.transform(operator); IoTDB.metaManager.clear(); - // expected to throw LogicalOptimizeException: The value of SOFFSET (%d) is equal to or exceeds the number of sequences (%d) that can actually be returned. + // expected to throw LogicalOptimizeException: The value of SOFFSET (%d) is equal to or exceeds + // the number of sequences (%d) that can actually be returned. } @Test public void testDeleteStorageGroup() throws IllegalPathException { String sqlStr = "delete storage group root.vehicle.d1"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(DeleteStorageGroupOperator.class, operator.getClass()); PartialPath path = new PartialPath("root.vehicle.d1"); Assert.assertEquals(path, ((DeleteStorageGroupOperator) operator).getDeletePathList().get(0)); @@ -200,8 +214,8 @@ public class LogicalPlanSmallTest { @Test public void testDisableAlign() { String sqlStr = "select * from root.vehicle disable align"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); Assert.assertFalse(((QueryOperator) operator).isAlignByTime()); } @@ -209,8 +223,8 @@ public class LogicalPlanSmallTest { @Test public void testNotDisableAlign() { String sqlStr = "select * from root.vehicle"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); Assert.assertTrue(((QueryOperator) operator).isAlignByTime()); } @@ -218,21 +232,20 @@ public class LogicalPlanSmallTest { @Test(expected = ParseCancellationException.class) public void testDisableAlignConflictAlignByDevice() { String sqlStr = "select * from root.vehicle disable align align by device"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr, ZoneId.systemDefault()); } @Test public void testChineseCharacter() throws IllegalPathException { String sqlStr1 = "set storage group to root.一级"; - RootOperator operator = (RootOperator) logicalGenerator - .generate(sqlStr1, ZoneId.systemDefault()); + RootOperator operator = + (RootOperator) logicalGenerator.generate(sqlStr1, ZoneId.systemDefault()); Assert.assertEquals(SetStorageGroupOperator.class, operator.getClass()); Assert.assertEquals(new PartialPath("root.一级"), ((SetStorageGroupOperator) operator).getPath()); String sqlStr2 = "select * from root.一级.设备1 limit 10 offset 20"; - operator = (RootOperator) logicalGenerator - .generate(sqlStr2, ZoneId.systemDefault()); + operator = (RootOperator) logicalGenerator.generate(sqlStr2, ZoneId.systemDefault()); Assert.assertEquals(QueryOperator.class, operator.getClass()); ArrayList<PartialPath> paths = new ArrayList<>(); paths.add(new PartialPath("*")); diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index 634770b..73e3cf3 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -18,17 +18,6 @@ */ package org.apache.iotdb.db.utils; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.concurrent.TimeUnit; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; -import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -48,17 +37,26 @@ import org.apache.iotdb.db.query.control.TracingManager; import org.apache.iotdb.db.rescon.PrimitiveArrayManager; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.db.service.IoTDB; + +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import org.apache.commons.io.FileUtils; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * <p> - * This class is used for cleaning test environment in unit test and integration test - * </p> - */ +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.fail; + +/** This class is used for cleaning test environment in unit test and integration test */ public class EnvironmentUtils { private static final Logger logger = LoggerFactory.getLogger(EnvironmentUtils.class); @@ -75,8 +73,8 @@ public class EnvironmentUtils { private static IoTDB daemon; - public static boolean examinePorts = Boolean - .parseBoolean(System.getProperty("test.port.closed", "false")); + public static boolean examinePorts = + Boolean.parseBoolean(System.getProperty("test.port.closed", "false")); public static void cleanEnv() throws IOException, StorageEngineException { // wait all compaction finished @@ -94,11 +92,11 @@ public class EnvironmentUtils { // TODO: this is just too slow, especially on Windows, consider a better way boolean closed = examinePorts(); if (!closed) { - //sleep 10 seconds + // sleep 10 seconds try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { - //do nothing + // do nothing } if (!examinePorts()) { @@ -141,7 +139,6 @@ public class EnvironmentUtils { config.setMemtableSizeThreshold(oldGroupSizeInByte); } - private static boolean examinePorts() { TTransport transport = new TSocket("127.0.0.1", 6667, 100); if (!transport.isOpen()) { @@ -151,10 +148,10 @@ public class EnvironmentUtils { transport.close(); return false; } catch (TTransportException e) { - //do nothing + // do nothing } } - //try sync service + // try sync service transport = new TSocket("127.0.0.1", 5555, 100); if (!transport.isOpen()) { try { @@ -163,33 +160,31 @@ public class EnvironmentUtils { transport.close(); return false; } catch (TTransportException e) { - //do nothing + // do nothing } } - //try jmx connection + // try jmx connection try { - JMXServiceURL url = - new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi"); + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:31999/jmxrmi"); JMXConnector jmxConnector = JMXConnectorFactory.connect(url); logger.error("stop JMX failed. 31999 can be connected now."); jmxConnector.close(); return false; } catch (IOException e) { - //do nothing + // do nothing } - //try MetricService + // try MetricService try (Socket socket = new Socket()) { socket.connect(new InetSocketAddress("127.0.0.1", 8181), 100); logger.error("stop MetricService failed. 8181 can be connected now."); return false; } catch (Exception e) { - //do nothing + // do nothing } - //do nothing + // do nothing return true; } - public static void cleanAllDir() throws IOException { // delete sequential files for (String path : directoryManager.getAllSequenceFileFolders()) { @@ -217,20 +212,16 @@ public class EnvironmentUtils { FileUtils.deleteDirectory(new File(dir)); } - /** - * disable the system monitor</br> this function should be called before all code in the setup - */ + /** disable the system monitor</br> this function should be called before all code in the setup */ public static void closeStatMonitor() { config.setEnableStatMonitor(false); } - /** - * disable memory control</br> this function should be called before all code in the setup - */ + /** disable memory control</br> this function should be called before all code in the setup */ public static void envSetUp() { logger.warn("EnvironmentUtil setup..."); IoTDBDescriptor.getInstance().getConfig().setThriftServerAwaitTimeForStopService(0); - //we do not start 8181 port in test. + // we do not start 8181 port in test. IoTDBDescriptor.getInstance().getConfig().setEnableMetricService(false); IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(Integer.MAX_VALUE); if (daemon == null) { @@ -245,7 +236,7 @@ public class EnvironmentUtils { createAllDir(); // disable the system monitor config.setEnableStatMonitor(false); - TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true, 1024, 0); + TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignQueryId(true); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); } @@ -303,7 +294,7 @@ public class EnvironmentUtils { for (String dataDir : config.getDataDirs()) { createDir(dataDir); } - //create user and roles folder + // create user and roles folder try { BasicAuthorizer.getInstance().reset(); } catch (AuthException e) {
