This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cp-commits
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0fe90c90f6057a359964285b552f0629bb98e5dd
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Feb 8 22:18:46 2023 +0800

    fix conflict in 0b65f4d
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  6 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  | 76 ++++++++++++++++++----
 .../distribution/DistributionPlanContext.java      | 14 ++++
 .../plan/planner/distribution/SourceRewriter.java  | 41 ++++++++----
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     |  6 +-
 6 files changed, 115 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8664cf8ddc..269cf0a01e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -904,7 +904,7 @@ public class IoTDBConfig {
    * series partition
    */
   private String seriesPartitionExecutorClass =
-      "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
 
   /** The number of series partitions in a database */
   private int seriesPartitionSlotNum = 10000;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 81910ef650..dbffbb4be3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1682,7 +1682,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     for (InsertRowStatement insertRowStatement : 
insertRowsStatement.getInsertRowStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertRowStatement.getDevicePath().getFullPath(), k -> new 
HashSet());
+              insertRowStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
       timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
     }
 
@@ -1707,7 +1707,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         insertMultiTabletsStatement.getInsertTabletStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertTabletStatement.getDevicePath().getFullPath(), k -> new 
HashSet());
+              insertTabletStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
       
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
     }
 
@@ -2372,7 +2372,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
     context.setQueryType(QueryType.WRITE);
     List<List<String>> measurementsList = 
createTemplateStatement.getMeasurements();
-    for (List measurements : measurementsList) {
+    for (List<String> measurements : measurementsList) {
       Set<String> measurementsSet = new HashSet<>(measurements);
       if (measurementsSet.size() < measurements.size()) {
         throw new SemanticException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..78103835a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -58,9 +58,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
 
@@ -181,7 +183,7 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
       try (ConfigNodeClient client =
           
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         TDataPartitionTableResp dataPartitionTableResp =
-            
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+            
client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataPartition = parseDataPartitionResp(dataPartitionTableResp);
@@ -208,7 +210,7 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     try (ConfigNodeClient client =
         
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataPartitionTableResp dataPartitionTableResp =
-          
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+          
client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
       if (dataPartitionTableResp.getStatus().getCode()
           == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return parseDataPartitionResp(dataPartitionTableResp);
@@ -261,9 +263,8 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     if (null == dataPartition) {
       try (ConfigNodeClient client =
           
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
-        TDataPartitionTableResp dataPartitionTableResp =
-            client.getOrCreateDataPartitionTable(
-                constructDataPartitionReq(splitDataPartitionQueryParams));
+        TDataPartitionReq req = 
constructDataPartitionReq(splitDataPartitionQueryParams);
+        TDataPartitionTableResp dataPartitionTableResp = 
client.getOrCreateDataPartitionTable(req);
 
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -350,6 +351,22 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     }
   }
 
+  private static class ComplexTimeSlotList {
+    Set<TTimePartitionSlot> timeSlotList;
+    boolean needLeftAll;
+    boolean needRightAll;
+
+    private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) {
+      timeSlotList = new HashSet<>();
+      this.needLeftAll = needLeftAll;
+      this.needRightAll = needRightAll;
+    }
+
+    private void putTimeSlot(List<TTimePartitionSlot> slotList) {
+      timeSlotList.addAll(slotList);
+    }
+  }
+
   private TDataPartitionReq constructDataPartitionReq(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = 
new HashMap<>();
@@ -357,15 +374,50 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
         sgNameToQueryParamsMap.entrySet()) {
       // for each sg
       Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new 
HashMap<>();
+
+      Map<TSeriesPartitionSlot, ComplexTimeSlotList> 
seriesSlotTimePartitionMap = new HashMap<>();
+
+      for (DataPartitionQueryParam queryParam : entry.getValue()) {
+        seriesSlotTimePartitionMap
+            .computeIfAbsent(
+                
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
+                k ->
+                    new ComplexTimeSlotList(
+                        queryParam.isNeedLeftAll(), 
queryParam.isNeedRightAll()))
+            .putTimeSlot(queryParam.getTimePartitionSlotList());
+      }
+      seriesSlotTimePartitionMap.forEach(
+          (k, v) ->
+              deviceToTimePartitionMap.put(
+                  k,
+                  new TTimeSlotList(
+                      new ArrayList<>(v.timeSlotList), v.needLeftAll, 
v.needRightAll)));
+      partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+    }
+    return new TDataPartitionReq(partitionSlotsMap);
+  }
+
+  /** For query, DataPartitionQueryParam is shared by each device */
+  private TDataPartitionReq constructDataPartitionReqForQuery(
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = 
new HashMap<>();
+    TTimeSlotList sharedTTimeSlotList = null;
+    for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+        sgNameToQueryParamsMap.entrySet()) {
+      // for each sg
+      Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new 
HashMap<>();
+
       for (DataPartitionQueryParam queryParam : entry.getValue()) {
-        TTimeSlotList timePartitionSlotList =
-            new TTimeSlotList(
-                queryParam.getTimePartitionSlotList(),
-                queryParam.isNeedLeftAll(),
-                queryParam.isNeedRightAll());
-        deviceToTimePartitionMap.put(
+        if (sharedTTimeSlotList == null) {
+          sharedTTimeSlotList =
+              new TTimeSlotList(
+                  queryParam.getTimePartitionSlotList(),
+                  queryParam.isNeedLeftAll(),
+                  queryParam.isNeedRightAll());
+        }
+        deviceToTimePartitionMap.putIfAbsent(
             
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
-            timePartitionSlotList);
+            sharedTTimeSlotList);
       }
       partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 32de442e65..c35c4a72ac 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+
+import java.util.Map;
 
 public class DistributionPlanContext {
   protected boolean isRoot;
@@ -32,6 +35,9 @@ public class DistributionPlanContext {
   // DataRegions
   protected boolean queryMultiRegion;
 
+  // used by group by level
+  private Map<String, Expression> columnNameToExpression;
+
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
@@ -62,4 +68,12 @@ public class DistributionPlanContext {
   public void setQueryMultiRegion(boolean queryMultiRegion) {
     this.queryMultiRegion = queryMultiRegion;
   }
+
+  public Map<String, Expression> getColumnNameToExpression() {
+    return columnNameToExpression;
+  }
+
+  public void setColumnNameToExpression(Map<String, Expression> 
columnNameToExpression) {
+    this.columnNameToExpression = columnNameToExpression;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 9cb276384e..0a3708a1a6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -749,6 +749,18 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
             : groupSourcesForGroupByLevel(root, sourceGroup, context);
 
     // Then, we calculate the attributes for GroupByLevelNode in each level
+    Map<String, Expression> columnNameToExpression = new HashMap<>();
+    for (CrossSeriesAggregationDescriptor originalDescriptor :
+        newRoot.getGroupByLevelDescriptors()) {
+      for (Expression exp : originalDescriptor.getInputExpressions()) {
+        columnNameToExpression.put(exp.getExpressionString(), exp);
+      }
+      columnNameToExpression.put(
+          originalDescriptor.getOutputExpression().getExpressionString(),
+          originalDescriptor.getOutputExpression());
+    }
+
+    context.setColumnNameToExpression(columnNameToExpression);
     calculateGroupByLevelNodeAttributes(newRoot, 0, context);
     return newRoot;
   }
@@ -884,22 +896,29 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       // Check every OutputColumn of GroupByLevelNode and set the Expression 
of corresponding
       // AggregationDescriptor
       List<CrossSeriesAggregationDescriptor> descriptorList = new 
ArrayList<>();
+      Map<String, Expression> columnNameToExpression = 
context.getColumnNameToExpression();
+      Set<Expression> childrenExpressionSet = new HashSet<>();
+      for (String childColumn : childrenOutputColumns) {
+        Expression childExpression =
+            columnNameToExpression.get(
+                childColumn.substring(childColumn.indexOf("(") + 1, 
childColumn.lastIndexOf(")")));
+        childrenExpressionSet.add(childExpression);
+      }
+
       for (CrossSeriesAggregationDescriptor originalDescriptor :
           handle.getGroupByLevelDescriptors()) {
         Set<Expression> descriptorExpressions = new HashSet<>();
-        for (String childColumn : childrenOutputColumns) {
-          // If this condition matched, the childColumn should come from 
GroupByLevelNode
-          if (isAggColumnMatchExpression(childColumn, 
originalDescriptor.getOutputExpression())) {
-            
descriptorExpressions.add(originalDescriptor.getOutputExpression());
-            continue;
-          }
-          for (Expression exp : originalDescriptor.getInputExpressions()) {
-            if (isAggColumnMatchExpression(childColumn, exp)) {
-              descriptorExpressions.add(exp);
-            }
+
+        if 
(childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
+          descriptorExpressions.add(originalDescriptor.getOutputExpression());
+        }
+
+        for (Expression exp : originalDescriptor.getInputExpressions()) {
+          if (childrenExpressionSet.contains(exp)) {
+            descriptorExpressions.add(exp);
           }
         }
-        if (descriptorExpressions.size() == 0) {
+        if (descriptorExpressions.isEmpty()) {
           continue;
         }
         CrossSeriesAggregationDescriptor descriptor = 
originalDescriptor.deepClone();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index d15cdd3ce4..3fbf8f44c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -581,13 +581,13 @@ public class AnalyzeTest {
   public void testDataPartitionAnalyze() {
     Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s) 
values(1,10),(86401,11)");
     Assert.assertEquals(
+        1,
         analysis
             .getDataPartitionInfo()
             .getDataPartitionMap()
             .get("root.sg")
-            .get(new TSeriesPartitionSlot(8923))
-            .size(),
-        1);
+            .get(new TSeriesPartitionSlot(1107))
+            .size());
   }
 
   @Test

Reply via email to