This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/new-rc1.0.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 61d57438379089d6398c4f6a1099ada0158c68a9 Author: Jackie Tien <[email protected]> AuthorDate: Wed Feb 8 22:18:46 2023 +0800 [To rel/1.0] Correct DataPartiton Fetch request parameter contruction (#9019) --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 6 +- .../mpp/plan/analyze/ClusterPartitionFetcher.java | 76 ++++++++++++++++++---- .../distribution/DistributionPlanContext.java | 14 ++++ .../plan/planner/distribution/SourceRewriter.java | 42 ++++++++---- .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java | 6 +- 6 files changed, 116 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 8664cf8ddc..269cf0a01e 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -904,7 +904,7 @@ public class IoTDBConfig { * series partition */ private String seriesPartitionExecutorClass = - "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor"; + "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor"; /** The number of series partitions in a database */ private int seriesPartitionSlotNum = 10000; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index 81910ef650..dbffbb4be3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -1682,7 +1682,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) { Set<TTimePartitionSlot> timePartitionSlotSet = dataPartitionQueryParamMap.computeIfAbsent( - insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet()); + insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet<>()); timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots()); } @@ -1707,7 +1707,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> insertMultiTabletsStatement.getInsertTabletStatementList()) { Set<TTimePartitionSlot> timePartitionSlotSet = dataPartitionQueryParamMap.computeIfAbsent( - insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet()); + insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet<>()); timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots()); } @@ -2372,7 +2372,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> context.setQueryType(QueryType.WRITE); List<List<String>> measurementsList = createTemplateStatement.getMeasurements(); - for (List measurements : measurementsList) { + for (List<String> measurements : measurementsList) { Set<String> measurementsSet = new HashSet<>(measurements); if (measurementsSet.size() < measurements.size()) { throw new SemanticException( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java index 2c70aaf3d0..78103835a5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java @@ -58,9 +58,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; public class ClusterPartitionFetcher implements IPartitionFetcher { @@ -181,7 +183,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) { TDataPartitionTableResp dataPartitionTableResp = - client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap)); + client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap)); if (dataPartitionTableResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { dataPartition = parseDataPartitionResp(dataPartitionTableResp); @@ -208,7 +210,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) { TDataPartitionTableResp dataPartitionTableResp = - client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap)); + client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap)); if (dataPartitionTableResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return parseDataPartitionResp(dataPartitionTableResp); @@ -261,9 +263,8 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { if (null == dataPartition) { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) { - TDataPartitionTableResp dataPartitionTableResp = - client.getOrCreateDataPartitionTable( - constructDataPartitionReq(splitDataPartitionQueryParams)); + TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams); + TDataPartitionTableResp dataPartitionTableResp = client.getOrCreateDataPartitionTable(req); if (dataPartitionTableResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -350,6 +351,22 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { } } + private static class ComplexTimeSlotList { + Set<TTimePartitionSlot> timeSlotList; + boolean needLeftAll; + boolean needRightAll; + + private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) { + timeSlotList = new HashSet<>(); + this.needLeftAll = needLeftAll; + this.needRightAll = needRightAll; + } + + private void putTimeSlot(List<TTimePartitionSlot> slotList) { + timeSlotList.addAll(slotList); + } + } + private TDataPartitionReq constructDataPartitionReq( Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) { Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>(); @@ -357,15 +374,50 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { sgNameToQueryParamsMap.entrySet()) { // for each sg Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>(); + + Map<TSeriesPartitionSlot, ComplexTimeSlotList> seriesSlotTimePartitionMap = new HashMap<>(); + + for (DataPartitionQueryParam queryParam : entry.getValue()) { + seriesSlotTimePartitionMap + .computeIfAbsent( + partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()), + k -> + new ComplexTimeSlotList( + queryParam.isNeedLeftAll(), queryParam.isNeedRightAll())) + .putTimeSlot(queryParam.getTimePartitionSlotList()); + } + seriesSlotTimePartitionMap.forEach( + (k, v) -> + deviceToTimePartitionMap.put( + k, + new TTimeSlotList( + new ArrayList<>(v.timeSlotList), v.needLeftAll, v.needRightAll))); + partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap); + } + return new TDataPartitionReq(partitionSlotsMap); + } + + /** For query, DataPartitionQueryParam is shared by each device */ + private TDataPartitionReq constructDataPartitionReqForQuery( + Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) { + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>(); + TTimeSlotList sharedTTimeSlotList = null; + for (Map.Entry<String, List<DataPartitionQueryParam>> entry : + sgNameToQueryParamsMap.entrySet()) { + // for each sg + Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new HashMap<>(); + for (DataPartitionQueryParam queryParam : entry.getValue()) { - TTimeSlotList timePartitionSlotList = - new TTimeSlotList( - queryParam.getTimePartitionSlotList(), - queryParam.isNeedLeftAll(), - queryParam.isNeedRightAll()); - deviceToTimePartitionMap.put( + if (sharedTTimeSlotList == null) { + sharedTTimeSlotList = + new TTimeSlotList( + queryParam.getTimePartitionSlotList(), + queryParam.isNeedLeftAll(), + queryParam.isNeedRightAll()); + } + deviceToTimePartitionMap.putIfAbsent( partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()), - timePartitionSlotList); + sharedTTimeSlotList); } partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java index 32de442e65..c35c4a72ac 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.mpp.plan.planner.distribution; import org.apache.iotdb.db.mpp.common.MPPQueryContext; +import org.apache.iotdb.db.mpp.plan.expression.Expression; + +import java.util.Map; public class DistributionPlanContext { protected boolean isRoot; @@ -32,6 +35,9 @@ public class DistributionPlanContext { // DataRegions protected boolean queryMultiRegion; + // used by group by level + private Map<String, Expression> columnNameToExpression; + protected DistributionPlanContext(MPPQueryContext queryContext) { this.isRoot = true; this.queryContext = queryContext; @@ -62,4 +68,12 @@ public class DistributionPlanContext { public void setQueryMultiRegion(boolean queryMultiRegion) { this.queryMultiRegion = queryMultiRegion; } + + public Map<String, Expression> getColumnNameToExpression() { + return columnNameToExpression; + } + + public void setColumnNameToExpression(Map<String, Expression> columnNameToExpression) { + this.columnNameToExpression = columnNameToExpression; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index 9cb276384e..aa11eb80c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -749,6 +749,18 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte : groupSourcesForGroupByLevel(root, sourceGroup, context); // Then, we calculate the attributes for GroupByLevelNode in each level + Map<String, Expression> columnNameToExpression = new HashMap<>(); + for (CrossSeriesAggregationDescriptor originalDescriptor : + newRoot.getGroupByLevelDescriptors()) { + for (Expression exp : originalDescriptor.getInputExpressions()) { + columnNameToExpression.put(exp.getExpressionString(), exp); + } + columnNameToExpression.put( + originalDescriptor.getOutputExpression().getExpressionString(), + originalDescriptor.getOutputExpression()); + } + + context.setColumnNameToExpression(columnNameToExpression); calculateGroupByLevelNodeAttributes(newRoot, 0, context); return newRoot; } @@ -884,22 +896,30 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding // AggregationDescriptor List<CrossSeriesAggregationDescriptor> descriptorList = new ArrayList<>(); + Map<String, Expression> columnNameToExpression = context.getColumnNameToExpression(); + Set<Expression> childrenExpressionSet = new HashSet<>(); + for (String childColumn : childrenOutputColumns) { + Expression childExpression = + columnNameToExpression.get( + childColumn.substring(childColumn.indexOf("(") + 1, childColumn.lastIndexOf(")"))); + childrenExpressionSet.add(childExpression); + } + for (CrossSeriesAggregationDescriptor originalDescriptor : handle.getGroupByLevelDescriptors()) { Set<Expression> descriptorExpressions = new HashSet<>(); - for (String childColumn : childrenOutputColumns) { - // If this condition matched, the childColumn should come from GroupByLevelNode - if (isAggColumnMatchExpression(childColumn, originalDescriptor.getOutputExpression())) { - descriptorExpressions.add(originalDescriptor.getOutputExpression()); - continue; - } - for (Expression exp : originalDescriptor.getInputExpressions()) { - if (isAggColumnMatchExpression(childColumn, exp)) { - descriptorExpressions.add(exp); - } + + if (childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) { + descriptorExpressions.add(originalDescriptor.getOutputExpression()); + } + + for (Expression exp : originalDescriptor.getInputExpressions()) { + if (childrenExpressionSet.contains(exp)) { + descriptorExpressions.add(exp); } } - if (descriptorExpressions.size() == 0) { + + if (descriptorExpressions.isEmpty()) { continue; } CrossSeriesAggregationDescriptor descriptor = originalDescriptor.deepClone(); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java index d15cdd3ce4..3fbf8f44c1 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java @@ -581,13 +581,13 @@ public class AnalyzeTest { public void testDataPartitionAnalyze() { Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s) values(1,10),(86401,11)"); Assert.assertEquals( + 1, analysis .getDataPartitionInfo() .getDataPartitionMap() .get("root.sg") - .get(new TSeriesPartitionSlot(8923)) - .size(), - 1); + .get(new TSeriesPartitionSlot(1107)) + .size()); } @Test
