This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 464cbfe44d [IOTDB-3366] fix npe when no data when show latest
timeseries. (#6126)
464cbfe44d is described below
commit 464cbfe44dbe9bb85226f6e32615641b49dc7058
Author: ZhangHongYin <[email protected]>
AuthorDate: Thu Jun 2 14:47:16 2022 +0800
[IOTDB-3366] fix npe when no data when show latest timeseries. (#6126)
---
.../operator/process/LastQueryMergeOperator.java | 7 +-
.../schema/SchemaQueryOrderByHeatOperator.java | 74 ++++++++++++----------
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 8 ++-
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 1 +
.../metedata/read/SchemaQueryOrderByHeatNode.java | 33 +++-------
5 files changed, 61 insertions(+), 62 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
index 261abdeb12..70ccf2134f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
@@ -50,7 +51,11 @@ public class LastQueryMergeOperator implements
ProcessOperator {
@Override
public ListenableFuture<Void> isBlocked() {
- return children.get(currentIndex).isBlocked();
+ if (currentIndex < inputOperatorsCount) {
+ return children.get(currentIndex).isBlocked();
+ } else {
+ return Futures.immediateVoidFuture();
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 24d3efd37d..22b7156f28 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -40,19 +40,18 @@ import static java.util.Objects.requireNonNull;
public class SchemaQueryOrderByHeatOperator implements ProcessOperator {
private final OperatorContext operatorContext;
- private final Operator left;
- private final Operator right;
private boolean isFinished = false;
- private final List<TsBlock> leftResult;
- private final List<TsBlock> rightResult;
+ private final List<Operator> operators;
+ private final boolean[] noMoreTsBlocks;
+ private final List<TsBlock> showTimeSeriesResult;
+ private final List<TsBlock> lastQueryResult;
- public SchemaQueryOrderByHeatOperator(
- OperatorContext operatorContext, Operator left, Operator right) {
+ public SchemaQueryOrderByHeatOperator(OperatorContext operatorContext,
List<Operator> operators) {
this.operatorContext = requireNonNull(operatorContext, "operatorContext is
null");
- this.left = requireNonNull(left, "left child operator is null");
- this.right = requireNonNull(right, "right child operator is null");
- this.leftResult = new ArrayList<>();
- this.rightResult = new ArrayList<>();
+ this.operators = operators;
+ this.noMoreTsBlocks = new boolean[operators.size()];
+ this.showTimeSeriesResult = new ArrayList<>();
+ this.lastQueryResult = new ArrayList<>();
}
@Override
@@ -64,10 +63,7 @@ public class SchemaQueryOrderByHeatOperator implements
ProcessOperator {
// Step 1: get last point result
Map<String, Long> timeseriesToLastTimestamp = new HashMap<>();
- for (TsBlock tsBlock : rightResult) {
- if (null == tsBlock || tsBlock.isEmpty()) {
- continue;
- }
+ for (TsBlock tsBlock : lastQueryResult) {
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
String timeseries = tsBlock.getColumn(0).getBinary(i).toString();
long time = tsBlock.getTimeByIndex(i);
@@ -77,10 +73,7 @@ public class SchemaQueryOrderByHeatOperator implements
ProcessOperator {
// Step 2: get last point timestamp to timeseries map
Map<Long, List<Object[]>> lastTimestampToTsSchema = new HashMap<>();
- for (TsBlock tsBlock : leftResult) {
- if (null == tsBlock || tsBlock.isEmpty()) {
- continue;
- }
+ for (TsBlock tsBlock : showTimeSeriesResult) {
TsBlock.TsBlockRowIterator tsBlockRowIterator =
tsBlock.getTsBlockRowIterator();
while (tsBlockRowIterator.hasNext()) {
Object[] line = tsBlockRowIterator.next();
@@ -124,25 +117,35 @@ public class SchemaQueryOrderByHeatOperator implements
ProcessOperator {
@Override
public ListenableFuture<Void> isBlocked() {
- ListenableFuture<Void> blocked = left.isBlocked();
- while (left.hasNext() && blocked.isDone()) {
- leftResult.add(left.next());
- blocked = left.isBlocked();
- }
- if (!blocked.isDone()) {
- return blocked;
- }
- blocked = right.isBlocked();
- while (right.hasNext() && blocked.isDone()) {
- rightResult.add(right.next());
- blocked = right.isBlocked();
- }
- if (!blocked.isDone()) {
- return blocked;
+ for (int i = 0; i < operators.size(); i++) {
+ if (!noMoreTsBlocks[i]) {
+ Operator operator = operators.get(i);
+ ListenableFuture<Void> blocked = operator.isBlocked();
+ while (operator.hasNext() && blocked.isDone()) {
+ TsBlock tsBlock = operator.next();
+ if (null != tsBlock && !tsBlock.isEmpty()) {
+ if (isShowTimeSeriesBlock(tsBlock)) {
+ showTimeSeriesResult.add(tsBlock);
+ } else {
+ lastQueryResult.add(tsBlock);
+ }
+ }
+ blocked = operator.isBlocked();
+ }
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ noMoreTsBlocks[i] = true;
+ }
}
return NOT_BLOCKED;
}
+ private boolean isShowTimeSeriesBlock(TsBlock tsBlock) {
+ return tsBlock.getValueColumnCount()
+ == HeaderConstant.showTimeSeriesHeader.getOutputValueColumnCount();
+ }
+
@Override
public boolean hasNext() {
return !isFinished;
@@ -150,8 +153,9 @@ public class SchemaQueryOrderByHeatOperator implements
ProcessOperator {
@Override
public void close() throws Exception {
- left.close();
- right.close();
+ for (Operator operator : operators) {
+ operator.close();
+ }
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 1b30176d58..f85a8eb809 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -349,8 +349,10 @@ public class LocalExecutionPlanner {
@Override
public Operator visitSchemaQueryOrderByHeat(
SchemaQueryOrderByHeatNode node, LocalExecutionPlanContext context) {
- Operator left = node.getLeft().accept(this, context);
- Operator right = node.getRight().accept(this, context);
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(n -> n.accept(this, context))
+ .collect(Collectors.toList());
OperatorContext operatorContext =
context.instanceContext.addOperatorContext(
@@ -358,7 +360,7 @@ public class LocalExecutionPlanner {
node.getPlanNodeId(),
SchemaQueryOrderByHeatOperator.class.getSimpleName());
- return new SchemaQueryOrderByHeatOperator(operatorContext, left, right);
+ return new SchemaQueryOrderByHeatOperator(operatorContext, children);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 44b243012c..0364111bb6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -458,6 +458,7 @@ public class LogicalPlanner {
.planSchemaQueryMerge(showTimeSeriesStatement.isOrderByHeat());
// show latest timeseries
if (showTimeSeriesStatement.isOrderByHeat()
+ && null != analysis.getDataPartitionInfo()
&& 0 !=
analysis.getDataPartitionInfo().getDataPartitionMap().size()) {
PlanNode lastPlanNode =
new LogicalPlanBuilder(context)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
index a0cd47b48c..93df4f4bd6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
@@ -18,14 +18,13 @@
*/
package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
-import com.google.common.collect.ImmutableList;
-
import java.nio.ByteBuffer;
import java.util.List;
@@ -35,32 +34,14 @@ public class SchemaQueryOrderByHeatNode extends
MultiChildNode {
super(id);
}
- /** show timeseries */
- private PlanNode left;
-
- /** last point */
- private PlanNode right;
-
- public PlanNode getLeft() {
- return left;
- }
-
- public PlanNode getRight() {
- return right;
- }
-
@Override
public List<PlanNode> getChildren() {
- return ImmutableList.of(left, right);
+ return children;
}
@Override
public void addChild(PlanNode child) {
- if (child instanceof SchemaQueryMergeNode) {
- left = child;
- } else {
- right = child;
- }
+ children.add(child);
}
@Override
@@ -75,7 +56,13 @@ public class SchemaQueryOrderByHeatNode extends
MultiChildNode {
@Override
public List<String> getOutputColumnNames() {
- return left.getOutputColumnNames();
+ for (PlanNode child : children) {
+ if (child.getOutputColumnNames().size()
+ == HeaderConstant.showTimeSeriesHeader.getOutputValueColumnCount() +
1) {
+ return child.getOutputColumnNames();
+ }
+ }
+ return children.get(0).getOutputColumnNames();
}
@Override