This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/InnerTimeJoin in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 13393987f247c0bd23841e1c6d56262573149c19 Author: JackieTien97 <[email protected]> AuthorDate: Thu Jan 4 11:56:21 2024 +0800 keep useless time partition --- .../db/queryengine/plan/analyze/Analysis.java | 15 ++ .../plan/planner/distribution/SourceRewriter.java | 232 +++++++++++++++++++++ .../plan/planner/plan/node/PlanGraphPrinter.java | 47 ++++- .../plan/node/process/join/InnerTimeJoinNode.java | 56 ++++- .../iotdb/commons/partition/DataPartition.java | 74 ++++++- 5 files changed, 416 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index b8e5829d36a..635ba9119b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSchemaNode; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.path.PartialPath; @@ -296,6 +297,20 @@ public class Analysis { return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(), timefilter); } + public TRegionReplicaSet getPartitionInfo( + PartialPath seriesPath, TTimePartitionSlot tTimePartitionSlot) { + return dataPartition.getDataRegionReplicaSet(seriesPath.getDevice(), tTimePartitionSlot).get(0); + } + + /** + * Get all time partition ids and combine adjacent time partition if they belong to same data + * region + */ + public List<List<TTimePartitionSlot>> getTimePartitionRange( + PartialPath seriesPath, Filter timefilter) { + return dataPartition.getTimePartitionRange(seriesPath.getDevice(), timefilter); + } + public List<TRegionReplicaSet> getPartitionInfo(String deviceName, Filter globalTimeFilter) { return dataPartition.getDataRegionReplicaSet(deviceName, globalTimeFilter); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index c5920833264..f27faab7d7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -20,10 +20,12 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -48,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDevi import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; @@ -63,7 +66,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter; +import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import java.util.ArrayList; import java.util.Collections; @@ -82,6 +88,12 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte private final Analysis analysis; + private static final OrderByParameter TIME_ASC = + new OrderByParameter(Collections.singletonList(new SortItem(OrderByKey.TIME, Ordering.ASC))); + + private static final OrderByParameter TIME_DESC = + new OrderByParameter(Collections.singletonList(new SortItem(OrderByKey.TIME, Ordering.DESC))); + public SourceRewriter(Analysis analysis) { this.analysis = analysis; } @@ -684,6 +696,226 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return new LastQueryCollectNode(id, node.isContainsLastTransformNode()); } + @Override + public List<PlanNode> visitInnerTimeJoin( + InnerTimeJoinNode node, DistributionPlanContext context) { + // All child nodes should be SourceNode + List<SeriesSourceNode> seriesScanNodes = new ArrayList<>(node.getChildren().size()); + List<List<List<TTimePartitionSlot>>> sourceTimeRangeList = new ArrayList<>(); + for (PlanNode child : node.getChildren()) { + if (!(child instanceof SeriesSourceNode)) { + throw new IllegalStateException( + "All child nodes of InnerTimeJoinNode should be SeriesSourceNode"); + } + SeriesSourceNode sourceNode = (SeriesSourceNode) child; + seriesScanNodes.add(sourceNode); + sourceTimeRangeList.add( + analysis.getTimePartitionRange( + sourceNode.getPartitionPath(), context.getPartitionTimeFilter())); + } + + List<List<TTimePartitionSlot>> res = splitTimePartition(sourceTimeRangeList); + + List<PlanNode> children = splitInnerTimeJoinNode(res, node, context, seriesScanNodes); + + if (children.size() == 1) { + return children; + } else { + // add merge sort node for InnerTimeJoinNodes + // TODO add new type of Node, just traverse all child nodes sequentially in the future + MergeSortNode mergeSortNode = + new MergeSortNode( + context.queryContext.getQueryId().genPlanNodeId(), + node.getMergeOrder() == Ordering.ASC ? TIME_ASC : TIME_DESC, + node.getOutputColumnNames()); + mergeSortNode.setChildren(children); + return Collections.singletonList(mergeSortNode); + } + } + + private List<List<TTimePartitionSlot>> splitTimePartition( + List<List<List<TTimePartitionSlot>>> childTimePartitionList) { + if (childTimePartitionList.isEmpty()) { + return Collections.emptyList(); + } + List<List<TTimePartitionSlot>> res = new ArrayList<>(childTimePartitionList.get(0)); + for (int i = 1, size = childTimePartitionList.size(); i < size; i++) { + res = combineTwoTimePartitionList(res, childTimePartitionList.get(i)); + } + return res; + } + + private List<List<TTimePartitionSlot>> combineTwoTimePartitionList( + List<List<TTimePartitionSlot>> left, List<List<TTimePartitionSlot>> right) { + int leftIndex = 0; + int leftSize = left.size(); + int rightIndex = 0; + int rightSize = right.size(); + + List<List<TTimePartitionSlot>> res = new ArrayList<>(Math.max(leftSize, rightSize)); + // common case, one SeriesSlot only belongs to one data region + if (leftSize == 1 && rightSize == 1) { + List<TTimePartitionSlot> list = new ArrayList<>(); + List<TTimePartitionSlot> left0 = left.get(0); + List<TTimePartitionSlot> right0 = left.get(0); + + int left0Index = 0; + int left0Size = left0.size(); + int right0Index = 0; + int right0Size = right0.size(); + while (left0Index < left0Size && right0Index < right0Size) { + if (left0.get(left0Index).startTime == right0.get(right0Index).startTime) { + list.add(left0.get(left0Index)); + left0Index++; + right0Index++; + } else if (left0.get(left0Index).startTime < right0.get(right0Index).startTime) { + list.add(left0.get(left0Index)); + left0Index++; + } else { + list.add(right0.get(left0Index)); + right0Index++; + } + } + if (left0Index < left0Size) { + list.addAll(left0Index, left0); + } + if (right0Index < right0Size) { + list.addAll(right0Index, right0); + } + res.add(list); + return res; + } + + int previousResIndex = 0; + res.add(new ArrayList<>()); + int leftCurrentListIndex = 0; + int rightCurrentListIndex = 0; + while (leftIndex < leftSize && rightIndex < rightSize) { + List<TTimePartitionSlot> leftCurrentList = left.get(leftIndex); + List<TTimePartitionSlot> rightCurrentList = left.get(rightIndex); + int leftCurrentListSize = leftCurrentList.size(); + int rightCurrentListSize = rightCurrentList.size(); + while (leftCurrentListIndex < leftCurrentListSize + && rightCurrentListIndex < rightCurrentListSize) { + if (leftCurrentList.get(leftCurrentListIndex).startTime + == rightCurrentList.get(rightCurrentListIndex).startTime) { + // new continuous time range + if ((leftCurrentListIndex == 0 && leftIndex != 0) + || (rightCurrentListIndex == 0 && rightIndex != 0)) { + previousResIndex++; + res.add(new ArrayList<>()); + } + res.get(previousResIndex).add(leftCurrentList.get(leftCurrentListIndex)); + leftCurrentListIndex++; + rightCurrentListIndex++; + } else if (leftCurrentList.get(leftCurrentListIndex).startTime + < rightCurrentList.get(rightCurrentListIndex).startTime) { + // new continuous time range + if (leftCurrentListIndex == 0 && leftIndex != 0) { + previousResIndex++; + res.add(new ArrayList<>()); + } + res.get(previousResIndex).add(leftCurrentList.get(leftCurrentListIndex)); + leftCurrentListIndex++; + } else { + // new continuous time range + if (rightCurrentListIndex == 0 && rightIndex != 0) { + previousResIndex++; + res.add(new ArrayList<>()); + } + res.get(previousResIndex).add(rightCurrentList.get(rightCurrentListIndex)); + rightCurrentListIndex++; + } + } + if (leftCurrentListIndex == leftCurrentListSize) { + leftIndex++; + leftCurrentListIndex = 0; + } + if (rightCurrentListIndex == rightCurrentListSize) { + rightIndex++; + rightCurrentListIndex = 0; + } + } + + if (leftIndex == leftSize) { + while (rightIndex < rightSize) { + if (rightCurrentListIndex == 0 && rightIndex != 0) { + previousResIndex++; + res.add(new ArrayList<>()); + } + res.get(previousResIndex).addAll(rightCurrentListIndex, right.get(rightIndex)); + rightIndex++; + rightCurrentListIndex = 0; + } + } + + if (rightIndex == rightSize) { + while (leftIndex < leftSize) { + if (leftCurrentListIndex == 0 && leftIndex != 0) { + previousResIndex++; + res.add(new ArrayList<>()); + } + res.get(previousResIndex).addAll(leftCurrentListIndex, left.get(leftIndex)); + leftIndex++; + leftCurrentListIndex = 0; + } + } + return res; + } + + private List<PlanNode> splitInnerTimeJoinNode( + List<List<TTimePartitionSlot>> continuousTimeRange, + InnerTimeJoinNode node, + DistributionPlanContext context, + List<SeriesSourceNode> seriesScanNodes) { + List<PlanNode> subInnerJoinNode = new ArrayList<>(continuousTimeRange.size()); + for (List<TTimePartitionSlot> oneRegion : continuousTimeRange) { + if (!oneRegion.isEmpty()) { + InnerTimeJoinNode innerTimeJoinNode = (InnerTimeJoinNode) node.clone(); + innerTimeJoinNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); + + List<Long> timePartitionIds = convertToTimePartitionIds(oneRegion); + innerTimeJoinNode.setTimePartitions(timePartitionIds); + + // region group id -> parent InnerTimeJoinNode + Map<Integer, InnerTimeJoinNode> map = new HashMap<>(); + for (SeriesSourceNode sourceNode : seriesScanNodes) { + TRegionReplicaSet dataRegion = + analysis.getPartitionInfo(sourceNode.getPartitionPath(), oneRegion.get(0)); + InnerTimeJoinNode parent = + map.computeIfAbsent( + dataRegion.regionId.id, + k -> { + InnerTimeJoinNode value = (InnerTimeJoinNode) node.clone(); + value.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); + value.setTimePartitions(timePartitionIds); + return value; + }); + SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone(); + split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); + split.setRegionReplicaSet(dataRegion); + parent.addChild(split); + } + + if (map.size() > 1) { + map.values().forEach(innerTimeJoinNode::addChild); + subInnerJoinNode.add(innerTimeJoinNode); + } else { + subInnerJoinNode.add(map.values().iterator().next()); + } + } + } + return subInnerJoinNode; + } + + private List<Long> convertToTimePartitionIds(List<TTimePartitionSlot> timePartitionSlotList) { + List<Long> res = new ArrayList<>(timePartitionSlotList.size()); + for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) { + res.add(TimePartitionUtils.getTimePartitionId(timePartitionSlot.startTime)); + } + return res; + } + @Override public List<PlanNode> visitFullOuterTimeJoin( FullOuterTimeJoinNode node, DistributionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 48ba7e943c4..fcebd9b0c66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -44,6 +44,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; @@ -70,6 +72,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter.GraphContext> { @@ -325,10 +328,52 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("LeftOuterTimeJoin-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("Order: %s", node.getMergeOrder())); + return render(node, boxValue, context); + } + + @Override + public List<String> visitInnerTimeJoin(InnerTimeJoinNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("InnerTimeJoin-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("Order: %s", node.getMergeOrder())); + Optional<List<Long>> timePartitions = node.getTimePartitions(); + if (timePartitions.isPresent()) { + int size = timePartitions.get().size(); + if (size > 0) { + StringBuilder builder = + new StringBuilder("TimePartitions: [").append(timePartitions.get().get(0)); + for (int i = 1; i < Math.min(size, 4); i++) { + builder.append(",").append(timePartitions.get().get(i)); + } + for (int i = 4; i < size; i += 4) { + builder.append(",").append(System.lineSeparator()); + builder.append(timePartitions.get().get(i)); + int j = i + 1; + while (j < Math.min(i + 4, size)) { + builder.append(",").append(timePartitions.get().get(j)); + } + } + builder.append("]"); + boxValue.add(builder.toString()); + } else { + boxValue.add("TimePartitions: []"); + } + } else { + boxValue.add("TimePartitions: ALL"); + } + + return render(node, boxValue, context); + } + @Override public List<String> visitFullOuterTimeJoin(FullOuterTimeJoinNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); - boxValue.add(String.format("TimeJoin-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("FullOuterTimeJoin-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("Order: %s", node.getMergeOrder())); return render(node, boxValue, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java index 380c25b7de4..6b885d87b37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/join/InnerTimeJoinNode.java @@ -33,6 +33,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -56,14 +57,23 @@ public class InnerTimeJoinNode extends MultiChildProcessNode { // This parameter indicates the order when executing multiway merge sort. private final Ordering mergeOrder; + // null for all time partitions + // empty for zero time partitions + private List<Long> timePartitions; + public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder) { - super(id, new ArrayList<>()); - this.mergeOrder = mergeOrder; + this(id, new ArrayList<>(), mergeOrder, null); } public InnerTimeJoinNode(PlanNodeId id, Ordering mergeOrder, List<PlanNode> children) { + this(id, children, mergeOrder, null); + } + + public InnerTimeJoinNode( + PlanNodeId id, List<PlanNode> children, Ordering mergeOrder, List<Long> timePartitions) { super(id, children); this.mergeOrder = mergeOrder; + this.timePartitions = timePartitions; } public Ordering getMergeOrder() { @@ -79,8 +89,9 @@ public class InnerTimeJoinNode extends MultiChildProcessNode { public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { return new InnerTimeJoinNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), + new ArrayList<>(children.subList(startIndex, endIndex)), getMergeOrder(), - new ArrayList<>(children.subList(startIndex, endIndex))); + timePartitions); } @Override @@ -100,18 +111,45 @@ public class InnerTimeJoinNode extends MultiChildProcessNode { protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.INNER_TIME_JOIN.serialize(byteBuffer); ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer); + if (timePartitions == null) { + ReadWriteIOUtils.write(false, byteBuffer); + } else { + ReadWriteIOUtils.write(true, byteBuffer); + ReadWriteIOUtils.write(timePartitions.size(), byteBuffer); + for (Long timePartitionId : timePartitions) { + ReadWriteIOUtils.write(timePartitionId, byteBuffer); + } + } } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.INNER_TIME_JOIN.serialize(stream); ReadWriteIOUtils.write(mergeOrder.ordinal(), stream); + if (timePartitions == null) { + ReadWriteIOUtils.write(false, stream); + } else { + ReadWriteIOUtils.write(true, stream); + ReadWriteIOUtils.write(timePartitions.size(), stream); + for (Long timePartitionId : timePartitions) { + ReadWriteIOUtils.write(timePartitionId, stream); + } + } } public static InnerTimeJoinNode deserialize(ByteBuffer byteBuffer) { Ordering mergeOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; + List<Long> timePartitionIds = null; + boolean hasTimePartitionIds = ReadWriteIOUtils.readBool(byteBuffer); + if (hasTimePartitionIds) { + int size = ReadWriteIOUtils.read(byteBuffer); + timePartitionIds = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + timePartitionIds.add(ReadWriteIOUtils.readLong(byteBuffer)); + } + } PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new InnerTimeJoinNode(planNodeId, mergeOrder); + return new InnerTimeJoinNode(planNodeId, new ArrayList<>(), mergeOrder, timePartitionIds); } @Override @@ -138,4 +176,14 @@ public class InnerTimeJoinNode extends MultiChildProcessNode { public int hashCode() { return Objects.hash(super.hashCode(), mergeOrder); } + + public void setTimePartitions(List<Long> timePartitions) { + this.timePartitions = timePartitions; + } + + // null for all time partitions + // empty for zero time partitions + public Optional<List<Long>> getTimePartitions() { + return Optional.ofNullable(timePartitions); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index c8425398083..07927bb4f24 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -28,10 +28,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toList; // TODO: Remove this class public class DataPartition extends Partition { @@ -71,6 +73,49 @@ public class DataPartition extends Partition { this.dataPartitionMap = dataPartitionMap; } + public List<List<TTimePartitionSlot>> getTimePartitionRange( + String deviceName, Filter timeFilter) { + String storageGroup = getStorageGroupByDevice(deviceName); + TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + if (!dataPartitionMap.containsKey(storageGroup) + || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { + return Collections.emptyList(); + } + + List<List<TTimePartitionSlot>> res = new ArrayList<>(); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> map = + dataPartitionMap.get(storageGroup).get(seriesPartitionSlot); + List<TTimePartitionSlot> timePartitionSlotList = + map.keySet().stream() + .filter(key -> TimePartitionUtils.satisfyPartitionStartTime(timeFilter, key.startTime)) + .sorted(Comparator.comparingLong(TTimePartitionSlot::getStartTime)) + .collect(toList()); + + if (timePartitionSlotList.isEmpty()) { + return res; + } + + int previousRegionId = map.get(timePartitionSlotList.get(0)).get(0).regionId.id; + int previousIndex = 0; + res.add(new ArrayList<>()); + res.get(previousIndex).add(timePartitionSlotList.get(0)); + + for (int i = 1, size = timePartitionSlotList.size(); i < size; i++) { + int currentRegionId = map.get(timePartitionSlotList.get(i)).get(0).regionId.id; + // region id of current time partition is same as previous + if (currentRegionId == previousRegionId) { + res.get(previousIndex).add(timePartitionSlotList.get(i)); + } else { + previousIndex++; + previousRegionId = currentRegionId; + res.add(new ArrayList<>()); + res.get(previousIndex).add(timePartitionSlotList.get(i)); + } + } + + return res; + } + public List<TRegionReplicaSet> getDataRegionReplicaSet(String deviceName, Filter timeFilter) { String storageGroup = getStorageGroupByDevice(deviceName); TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); @@ -84,7 +129,30 @@ public class DataPartition extends Partition { TimePartitionUtils.satisfyPartitionStartTime(timeFilter, entry.getKey().startTime)) .flatMap(entry -> entry.getValue().stream()) .distinct() - .collect(Collectors.toList()); + .collect(toList()); + } + + public List<TRegionReplicaSet> getDataRegionReplicaSet( + String deviceName, TTimePartitionSlot tTimePartitionSlot) { + String storageGroup = getStorageGroupByDevice(deviceName); + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> dbMap = + dataPartitionMap.get(storageGroup); + if (dbMap == null) { + return Collections.singletonList(NOT_ASSIGNED); + } + TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> seriesSlotMap = dbMap.get(seriesPartitionSlot); + if (seriesSlotMap == null) { + return Collections.singletonList(NOT_ASSIGNED); + } + + List<TRegionReplicaSet> regionReplicaSets = seriesSlotMap.get(tTimePartitionSlot); + + if (regionReplicaSets == null) { + return Collections.singletonList(NOT_ASSIGNED); + } + + return regionReplicaSets; } public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting( @@ -155,7 +223,7 @@ public class DataPartition extends Partition { partition.entrySet().stream() .flatMap( s -> s.getValue().entrySet().stream().flatMap(e -> e.getValue().stream())) - .collect(Collectors.toList()); + .collect(toList()); for (TRegionReplicaSet regionReplicaSet : ret) { distributionMap .computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
