This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/InnerTimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 13393987f247c0bd23841e1c6d56262573149c19
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Jan 4 11:56:21 2024 +0800

    keep useless time partition
---
 .../db/queryengine/plan/analyze/Analysis.java      |  15 ++
 .../plan/planner/distribution/SourceRewriter.java  | 232 +++++++++++++++++++++
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  47 ++++-
 .../plan/node/process/join/InnerTimeJoinNode.java  |  56 ++++-
 .../iotdb/commons/partition/DataPartition.java     |  74 ++++++-
 5 files changed, 416 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index b8e5829d36a..635ba9119b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -296,6 +297,20 @@ public class Analysis {
     return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(), 
timefilter);
   }
 
+  public TRegionReplicaSet getPartitionInfo(
+      PartialPath seriesPath, TTimePartitionSlot tTimePartitionSlot) {
+    return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(), 
tTimePartitionSlot).get(0);
+  }
+
+  /**
+   * Get all time partition ids and combine adjacent time partition if they 
belong to same data
+   * region
+   */
+  public List<List<TTimePartitionSlot>> getTimePartitionRange(
+      PartialPath seriesPath, Filter timefilter) {
+    return dataPartition.getTimePartitionRange(seriesPath.getDevice(), 
timefilter);
+  }
+
   public List<TRegionReplicaSet> getPartitionInfo(String deviceName, Filter 
globalTimeFilter) {
     return dataPartition.getDataRegionReplicaSet(deviceName, globalTimeFilter);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index c5920833264..f27faab7d7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.db.queryengine.plan.planner.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -48,6 +50,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDevi
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
@@ -63,7 +66,10 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -82,6 +88,12 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
 
   private final Analysis analysis;
 
+  private static final OrderByParameter TIME_ASC =
+      new OrderByParameter(Collections.singletonList(new 
SortItem(OrderByKey.TIME, Ordering.ASC)));
+
+  private static final OrderByParameter TIME_DESC =
+      new OrderByParameter(Collections.singletonList(new 
SortItem(OrderByKey.TIME, Ordering.DESC)));
+
   public SourceRewriter(Analysis analysis) {
     this.analysis = analysis;
   }
@@ -684,6 +696,226 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return new LastQueryCollectNode(id, node.isContainsLastTransformNode());
   }
 
+  @Override
+  public List<PlanNode> visitInnerTimeJoin(
+      InnerTimeJoinNode node, DistributionPlanContext context) {
+    // All child nodes should be SourceNode
+    List<SeriesSourceNode> seriesScanNodes = new 
ArrayList<>(node.getChildren().size());
+    List<List<List<TTimePartitionSlot>>> sourceTimeRangeList = new 
ArrayList<>();
+    for (PlanNode child : node.getChildren()) {
+      if (!(child instanceof SeriesSourceNode)) {
+        throw new IllegalStateException(
+            "All child nodes of InnerTimeJoinNode should be SeriesSourceNode");
+      }
+      SeriesSourceNode sourceNode = (SeriesSourceNode) child;
+      seriesScanNodes.add(sourceNode);
+      sourceTimeRangeList.add(
+          analysis.getTimePartitionRange(
+              sourceNode.getPartitionPath(), 
context.getPartitionTimeFilter()));
+    }
+
+    List<List<TTimePartitionSlot>> res = 
splitTimePartition(sourceTimeRangeList);
+
+    List<PlanNode> children = splitInnerTimeJoinNode(res, node, context, 
seriesScanNodes);
+
+    if (children.size() == 1) {
+      return children;
+    } else {
+      // add merge sort node for InnerTimeJoinNodes
+      // TODO add new type of Node, just traverse all child nodes sequentially 
in the future
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              node.getMergeOrder() == Ordering.ASC ? TIME_ASC : TIME_DESC,
+              node.getOutputColumnNames());
+      mergeSortNode.setChildren(children);
+      return Collections.singletonList(mergeSortNode);
+    }
+  }
+
+  private List<List<TTimePartitionSlot>> splitTimePartition(
+      List<List<List<TTimePartitionSlot>>> childTimePartitionList) {
+    if (childTimePartitionList.isEmpty()) {
+      return Collections.emptyList();
+    }
+    List<List<TTimePartitionSlot>> res = new 
ArrayList<>(childTimePartitionList.get(0));
+    for (int i = 1, size = childTimePartitionList.size(); i < size; i++) {
+      res = combineTwoTimePartitionList(res, childTimePartitionList.get(i));
+    }
+    return res;
+  }
+
+  private List<List<TTimePartitionSlot>> combineTwoTimePartitionList(
+      List<List<TTimePartitionSlot>> left, List<List<TTimePartitionSlot>> 
right) {
+    int leftIndex = 0;
+    int leftSize = left.size();
+    int rightIndex = 0;
+    int rightSize = right.size();
+
+    List<List<TTimePartitionSlot>> res = new ArrayList<>(Math.max(leftSize, 
rightSize));
+    // common case, one SeriesSlot only belongs to one data region
+    if (leftSize == 1 && rightSize == 1) {
+      List<TTimePartitionSlot> list = new ArrayList<>();
+      List<TTimePartitionSlot> left0 = left.get(0);
+      List<TTimePartitionSlot> right0 = left.get(0);
+
+      int left0Index = 0;
+      int left0Size = left0.size();
+      int right0Index = 0;
+      int right0Size = right0.size();
+      while (left0Index < left0Size && right0Index < right0Size) {
+        if (left0.get(left0Index).startTime == 
right0.get(right0Index).startTime) {
+          list.add(left0.get(left0Index));
+          left0Index++;
+          right0Index++;
+        } else if (left0.get(left0Index).startTime < 
right0.get(right0Index).startTime) {
+          list.add(left0.get(left0Index));
+          left0Index++;
+        } else {
+          list.add(right0.get(left0Index));
+          right0Index++;
+        }
+      }
+      if (left0Index < left0Size) {
+        list.addAll(left0Index, left0);
+      }
+      if (right0Index < right0Size) {
+        list.addAll(right0Index, right0);
+      }
+      res.add(list);
+      return res;
+    }
+
+    int previousResIndex = 0;
+    res.add(new ArrayList<>());
+    int leftCurrentListIndex = 0;
+    int rightCurrentListIndex = 0;
+    while (leftIndex < leftSize && rightIndex < rightSize) {
+      List<TTimePartitionSlot> leftCurrentList = left.get(leftIndex);
+      List<TTimePartitionSlot> rightCurrentList = left.get(rightIndex);
+      int leftCurrentListSize = leftCurrentList.size();
+      int rightCurrentListSize = rightCurrentList.size();
+      while (leftCurrentListIndex < leftCurrentListSize
+          && rightCurrentListIndex < rightCurrentListSize) {
+        if (leftCurrentList.get(leftCurrentListIndex).startTime
+            == rightCurrentList.get(rightCurrentListIndex).startTime) {
+          // new continuous time range
+          if ((leftCurrentListIndex == 0 && leftIndex != 0)
+              || (rightCurrentListIndex == 0 && rightIndex != 0)) {
+            previousResIndex++;
+            res.add(new ArrayList<>());
+          }
+          
res.get(previousResIndex).add(leftCurrentList.get(leftCurrentListIndex));
+          leftCurrentListIndex++;
+          rightCurrentListIndex++;
+        } else if (leftCurrentList.get(leftCurrentListIndex).startTime
+            < rightCurrentList.get(rightCurrentListIndex).startTime) {
+          // new continuous time range
+          if (leftCurrentListIndex == 0 && leftIndex != 0) {
+            previousResIndex++;
+            res.add(new ArrayList<>());
+          }
+          
res.get(previousResIndex).add(leftCurrentList.get(leftCurrentListIndex));
+          leftCurrentListIndex++;
+        } else {
+          // new continuous time range
+          if (rightCurrentListIndex == 0 && rightIndex != 0) {
+            previousResIndex++;
+            res.add(new ArrayList<>());
+          }
+          
res.get(previousResIndex).add(rightCurrentList.get(rightCurrentListIndex));
+          rightCurrentListIndex++;
+        }
+      }
+      if (leftCurrentListIndex == leftCurrentListSize) {
+        leftIndex++;
+        leftCurrentListIndex = 0;
+      }
+      if (rightCurrentListIndex == rightCurrentListSize) {
+        rightIndex++;
+        rightCurrentListIndex = 0;
+      }
+    }
+
+    if (leftIndex == leftSize) {
+      while (rightIndex < rightSize) {
+        if (rightCurrentListIndex == 0 && rightIndex != 0) {
+          previousResIndex++;
+          res.add(new ArrayList<>());
+        }
+        res.get(previousResIndex).addAll(rightCurrentListIndex, 
right.get(rightIndex));
+        rightIndex++;
+        rightCurrentListIndex = 0;
+      }
+    }
+
+    if (rightIndex == rightSize) {
+      while (leftIndex < leftSize) {
+        if (leftCurrentListIndex == 0 && leftIndex != 0) {
+          previousResIndex++;
+          res.add(new ArrayList<>());
+        }
+        res.get(previousResIndex).addAll(leftCurrentListIndex, 
left.get(leftIndex));
+        leftIndex++;
+        leftCurrentListIndex = 0;
+      }
+    }
+    return res;
+  }
+
+  private List<PlanNode> splitInnerTimeJoinNode(
+      List<List<TTimePartitionSlot>> continuousTimeRange,
+      InnerTimeJoinNode node,
+      DistributionPlanContext context,
+      List<SeriesSourceNode> seriesScanNodes) {
+    List<PlanNode> subInnerJoinNode = new 
ArrayList<>(continuousTimeRange.size());
+    for (List<TTimePartitionSlot> oneRegion : continuousTimeRange) {
+      if (!oneRegion.isEmpty()) {
+        InnerTimeJoinNode innerTimeJoinNode = (InnerTimeJoinNode) node.clone();
+        
innerTimeJoinNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+
+        List<Long> timePartitionIds = convertToTimePartitionIds(oneRegion);
+        innerTimeJoinNode.setTimePartitions(timePartitionIds);
+
+        // region group id -> parent InnerTimeJoinNode
+        Map<Integer, InnerTimeJoinNode> map = new HashMap<>();
+        for (SeriesSourceNode sourceNode : seriesScanNodes) {
+          TRegionReplicaSet dataRegion =
+              analysis.getPartitionInfo(sourceNode.getPartitionPath(), 
oneRegion.get(0));
+          InnerTimeJoinNode parent =
+              map.computeIfAbsent(
+                  dataRegion.regionId.id,
+                  k -> {
+                    InnerTimeJoinNode value = (InnerTimeJoinNode) node.clone();
+                    
value.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+                    value.setTimePartitions(timePartitionIds);
+                    return value;
+                  });
+          SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
+          
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          split.setRegionReplicaSet(dataRegion);
+          parent.addChild(split);
+        }
+
+        if (map.size() > 1) {
+          map.values().forEach(innerTimeJoinNode::addChild);
+          subInnerJoinNode.add(innerTimeJoinNode);
+        } else {
+          subInnerJoinNode.add(map.values().iterator().next());
+        }
+      }
+    }
+    return subInnerJoinNode;
+  }
+
+  private List<Long> convertToTimePartitionIds(List<TTimePartitionSlot> 
timePartitionSlotList) {
+    List<Long> res = new ArrayList<>(timePartitionSlotList.size());
+    for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
+      
res.add(TimePartitionUtils.getTimePartitionId(timePartitionSlot.startTime));
+    }
+    return res;
+  }
+
   @Override
   public List<PlanNode> visitFullOuterTimeJoin(
       FullOuterTimeJoinNode node, DistributionPlanContext context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 48ba7e943c4..fcebd9b0c66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -44,6 +44,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
@@ -70,6 +72,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 
 public class PlanGraphPrinter extends PlanVisitor<List<String>, 
PlanGraphPrinter.GraphContext> {
 
@@ -325,10 +328,52 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, 
GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("LeftOuterTimeJoin-%s", 
node.getPlanNodeId().getId()));
+    boxValue.add(String.format("Order: %s", node.getMergeOrder()));
+    return render(node, boxValue, context);
+  }
+
+  @Override
+  public List<String> visitInnerTimeJoin(InnerTimeJoinNode node, GraphContext 
context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("InnerTimeJoin-%s", 
node.getPlanNodeId().getId()));
+    boxValue.add(String.format("Order: %s", node.getMergeOrder()));
+    Optional<List<Long>> timePartitions = node.getTimePartitions();
+    if (timePartitions.isPresent()) {
+      int size = timePartitions.get().size();
+      if (size > 0) {
+        StringBuilder builder =
+            new StringBuilder("TimePartitions: 
[").append(timePartitions.get().get(0));
+        for (int i = 1; i < Math.min(size, 4); i++) {
+          builder.append(",").append(timePartitions.get().get(i));
+        }
+        for (int i = 4; i < size; i += 4) {
+          builder.append(",").append(System.lineSeparator());
+          builder.append(timePartitions.get().get(i));
+          int j = i + 1;
+          while (j < Math.min(i + 4, size)) {
+            builder.append(",").append(timePartitions.get().get(j));
+          }
+        }
+        builder.append("]");
+        boxValue.add(builder.toString());
+      } else {
+        boxValue.add("TimePartitions: []");
+      }
+    } else {
+      boxValue.add("TimePartitions: ALL");
+    }
+
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitFullOuterTimeJoin(FullOuterTimeJoinNode node, 
GraphContext context) {
     List<String> boxValue = new ArrayList<>();
-    boxValue.add(String.format("TimeJoin-%s", node.getPlanNodeId().getId()));
+    boxValue.add(String.format("FullOuterTimeJoin-%s", 
node.getPlanNodeId().getId()));
     boxValue.add(String.format("Order: %s", node.getMergeOrder()));
     return render(node, boxValue, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java
index 380c25b7de4..6b885d87b37 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java
@@ -33,6 +33,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -56,14 +57,23 @@ public class InnerTimeJoinNode extends 
MultiChildProcessNode {
   // This parameter indicates the order when executing multiway merge sort.
   private final Ordering mergeOrder;
 
+  // null for all time partitions
+  // empty for zero time partitions
+  private List<Long> timePartitions;
+
   public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder) {
-    super(id, new ArrayList<>());
-    this.mergeOrder = mergeOrder;
+    this(id, new ArrayList<>(), mergeOrder, null);
   }
 
   public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder, List<PlanNode> 
children) {
+    this(id, children, mergeOrder, null);
+  }
+
+  public InnerTimeJoinNode(
+      PlanNodeId id, List<PlanNode> children, Ordering mergeOrder, List<Long> 
timePartitions) {
     super(id, children);
     this.mergeOrder = mergeOrder;
+    this.timePartitions = timePartitions;
   }
 
   public Ordering getMergeOrder() {
@@ -79,8 +89,9 @@ public class InnerTimeJoinNode extends MultiChildProcessNode {
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
     return new InnerTimeJoinNode(
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)),
         getMergeOrder(),
-        new ArrayList<>(children.subList(startIndex, endIndex)));
+        timePartitions);
   }
 
   @Override
@@ -100,18 +111,45 @@ public class InnerTimeJoinNode extends 
MultiChildProcessNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.INNER_TIME_JOIN.serialize(byteBuffer);
     ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
+    if (timePartitions == null) {
+      ReadWriteIOUtils.write(false, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write(true, byteBuffer);
+      ReadWriteIOUtils.write(timePartitions.size(), byteBuffer);
+      for (Long timePartitionId : timePartitions) {
+        ReadWriteIOUtils.write(timePartitionId, byteBuffer);
+      }
+    }
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
     PlanNodeType.INNER_TIME_JOIN.serialize(stream);
     ReadWriteIOUtils.write(mergeOrder.ordinal(), stream);
+    if (timePartitions == null) {
+      ReadWriteIOUtils.write(false, stream);
+    } else {
+      ReadWriteIOUtils.write(true, stream);
+      ReadWriteIOUtils.write(timePartitions.size(), stream);
+      for (Long timePartitionId : timePartitions) {
+        ReadWriteIOUtils.write(timePartitionId, stream);
+      }
+    }
   }
 
   public static InnerTimeJoinNode deserialize(ByteBuffer byteBuffer) {
     Ordering mergeOrder = 
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    List<Long> timePartitionIds = null;
+    boolean hasTimePartitionIds = ReadWriteIOUtils.readBool(byteBuffer);
+    if (hasTimePartitionIds) {
+      int size = ReadWriteIOUtils.read(byteBuffer);
+      timePartitionIds = new ArrayList<>(size);
+      for (int i = 0; i < size; i++) {
+        timePartitionIds.add(ReadWriteIOUtils.readLong(byteBuffer));
+      }
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new InnerTimeJoinNode(planNodeId, mergeOrder);
+    return new InnerTimeJoinNode(planNodeId, new ArrayList<>(), mergeOrder, 
timePartitionIds);
   }
 
   @Override
@@ -138,4 +176,14 @@ public class InnerTimeJoinNode extends 
MultiChildProcessNode {
   public int hashCode() {
     return Objects.hash(super.hashCode(), mergeOrder);
   }
+
+  public void setTimePartitions(List<Long> timePartitions) {
+    this.timePartitions = timePartitions;
+  }
+
+  // null for all time partitions
+  // empty for zero time partitions
+  public Optional<List<Long>> getTimePartitions() {
+    return Optional.ofNullable(timePartitions);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index c8425398083..07927bb4f24 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -28,10 +28,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
 
 // TODO: Remove this class
 public class DataPartition extends Partition {
@@ -71,6 +73,49 @@ public class DataPartition extends Partition {
     this.dataPartitionMap = dataPartitionMap;
   }
 
+  public List<List<TTimePartitionSlot>> getTimePartitionRange(
+      String deviceName, Filter timeFilter) {
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceName);
+    if (!dataPartitionMap.containsKey(storageGroup)
+        || 
!dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
+      return Collections.emptyList();
+    }
+
+    List<List<TTimePartitionSlot>> res = new ArrayList<>();
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> map =
+        dataPartitionMap.get(storageGroup).get(seriesPartitionSlot);
+    List<TTimePartitionSlot> timePartitionSlotList =
+        map.keySet().stream()
+            .filter(key -> 
TimePartitionUtils.satisfyPartitionStartTime(timeFilter, key.startTime))
+            .sorted(Comparator.comparingLong(TTimePartitionSlot::getStartTime))
+            .collect(toList());
+
+    if (timePartitionSlotList.isEmpty()) {
+      return res;
+    }
+
+    int previousRegionId = 
map.get(timePartitionSlotList.get(0)).get(0).regionId.id;
+    int previousIndex = 0;
+    res.add(new ArrayList<>());
+    res.get(previousIndex).add(timePartitionSlotList.get(0));
+
+    for (int i = 1, size = timePartitionSlotList.size(); i < size; i++) {
+      int currentRegionId = 
map.get(timePartitionSlotList.get(i)).get(0).regionId.id;
+      // region id of current time partition is same as previous
+      if (currentRegionId == previousRegionId) {
+        res.get(previousIndex).add(timePartitionSlotList.get(i));
+      } else {
+        previousIndex++;
+        previousRegionId = currentRegionId;
+        res.add(new ArrayList<>());
+        res.get(previousIndex).add(timePartitionSlotList.get(i));
+      }
+    }
+
+    return res;
+  }
+
   public List<TRegionReplicaSet> getDataRegionReplicaSet(String deviceName, 
Filter timeFilter) {
     String storageGroup = getStorageGroupByDevice(deviceName);
     TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceName);
@@ -84,7 +129,30 @@ public class DataPartition extends Partition {
                 TimePartitionUtils.satisfyPartitionStartTime(timeFilter, 
entry.getKey().startTime))
         .flatMap(entry -> entry.getValue().stream())
         .distinct()
-        .collect(Collectors.toList());
+        .collect(toList());
+  }
+
+  public List<TRegionReplicaSet> getDataRegionReplicaSet(
+      String deviceName, TTimePartitionSlot tTimePartitionSlot) {
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> dbMap =
+        dataPartitionMap.get(storageGroup);
+    if (dbMap == null) {
+      return Collections.singletonList(NOT_ASSIGNED);
+    }
+    TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceName);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> seriesSlotMap = 
dbMap.get(seriesPartitionSlot);
+    if (seriesSlotMap == null) {
+      return Collections.singletonList(NOT_ASSIGNED);
+    }
+
+    List<TRegionReplicaSet> regionReplicaSets = 
seriesSlotMap.get(tTimePartitionSlot);
+
+    if (regionReplicaSets == null) {
+      return Collections.singletonList(NOT_ASSIGNED);
+    }
+
+    return regionReplicaSets;
   }
 
   public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting(
@@ -155,7 +223,7 @@ public class DataPartition extends Partition {
               partition.entrySet().stream()
                   .flatMap(
                       s -> s.getValue().entrySet().stream().flatMap(e -> 
e.getValue().stream()))
-                  .collect(Collectors.toList());
+                  .collect(toList());
           for (TRegionReplicaSet regionReplicaSet : ret) {
             distributionMap
                 .computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)

Reply via email to