This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cabb8fff2f [IOTDB-3244] Implememtation of LastOperator (#6001)
cabb8fff2f is described below
commit cabb8fff2f0d7538bbe5d980838078b1baee4343
Author: Jackie Tien <[email protected]>
AuthorDate: Wed May 25 19:39:28 2022 +0800
[IOTDB-3244] Implememtation of LastOperator (#6001)
---
.../db/mpp/execution/operator/LastQueryUtil.java | 79 +++++
.../operator/process/LastQueryMergeOperator.java | 82 ++++++
.../operator/process/UpdateLastCacheOperator.java | 127 ++++++++
.../operator/source/LastCacheScanOperator.java | 64 ++++
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 235 +++++++++++++++
.../plan/node/process/LastQueryMergeNode.java | 32 +-
.../plan/node/source/AlignedLastQueryScanNode.java | 42 +--
.../plan/node/source/LastQueryScanNode.java | 42 +--
.../iotdb/db/query/executor/LastQueryExecutor.java | 5 +-
.../operator/LastCacheScanOperatorTest.java | 93 ++++++
.../operator/LastQueryMergeOperatorTest.java | 327 +++++++++++++++++++++
.../SeriesAggregationScanOperatorTest.java | 3 +-
.../operator/UpdateLastCacheOperatorTest.java | 219 ++++++++++++++
13 files changed, 1266 insertions(+), 84 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
new file mode 100644
index 0000000000..19e9571171
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.LastValueDescAccumulator;
+import org.apache.iotdb.db.mpp.aggregation.MaxTimeDescAccumulator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LastQueryUtil {
+
+ public static TsBlockBuilder createTsBlockBuilder() {
+ return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT,
TSDataType.TEXT, TSDataType.TEXT));
+ }
+
+ public static TsBlockBuilder createTsBlockBuilder(int
initialExpectedEntries) {
+ return new TsBlockBuilder(
+ initialExpectedEntries,
+ ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+ }
+
+ public static void appendLastValue(
+ TsBlockBuilder builder, long lastTime, String fullPath, String
lastValue, String dataType) {
+ // Time
+ builder.getTimeColumnBuilder().writeLong(lastTime);
+ // timeseries
+ builder.getColumnBuilder(0).writeBinary(new Binary(fullPath));
+ // value
+ builder.getColumnBuilder(1).writeBinary(new Binary(lastValue));
+ // dataType
+ builder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+ builder.declarePosition();
+ }
+
+ public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
+ return filter == null || filter.satisfy(tvPair.getTimestamp(),
tvPair.getValue().getValue());
+ }
+
+ public static List<Aggregator> createAggregators(TSDataType dataType) {
+ // max_time, last_value
+ List<Aggregator> aggregators = new ArrayList<>(2);
+ aggregators.add(new Aggregator(new MaxTimeDescAccumulator(),
AggregationStep.SINGLE));
+ aggregators.add(new Aggregator(new LastValueDescAccumulator(dataType),
AggregationStep.SINGLE));
+ return aggregators;
+ }
+
+ public static boolean needUpdateCache(Filter timeFilter) {
+ // Update the cache only when, the filter is gt (greater than) or ge
(greater than or equal to)
+ return (timeFilter instanceof GtEq) || (timeFilter instanceof Gt);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
new file mode 100644
index 0000000000..261abdeb12
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class LastQueryMergeOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+
+ private final List<Operator> children;
+
+ private final int inputOperatorsCount;
+
+ private int currentIndex;
+
+ public LastQueryMergeOperator(OperatorContext operatorContext,
List<Operator> children) {
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.inputOperatorsCount = children.size();
+ this.currentIndex = 0;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return children.get(currentIndex).isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ if (children.get(currentIndex).hasNext()) {
+ return children.get(currentIndex).next();
+ } else {
+ currentIndex++;
+ return null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < inputOperatorsCount;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
new file mode 100644
index 0000000000..ee0eb898a8
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UpdateLastCacheOperator implements ProcessOperator {
+
+ private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
+ new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT,
TSDataType.TEXT))
+ .build();
+
+ private final OperatorContext operatorContext;
+
+ private final Operator child;
+
+ // fullPath for queried time series
+ private final PartialPath fullPath;
+
+ // dataType for queried time series;
+ private final String dataType;
+
+ private final DataNodeSchemaCache lastCache;
+
+ private final boolean needUpdateCache;
+
+ private final TsBlockBuilder tsBlockBuilder;
+
+ public UpdateLastCacheOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ PartialPath fullPath,
+ TSDataType dataType,
+ DataNodeSchemaCache dataNodeSchemaCache,
+ boolean needUpdateCache) {
+ this.operatorContext = operatorContext;
+ this.child = child;
+ this.fullPath = fullPath;
+ this.dataType = dataType.name();
+ this.lastCache = dataNodeSchemaCache;
+ this.needUpdateCache = needUpdateCache;
+ this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock res = child.next();
+ if (res == null) {
+ return null;
+ }
+ if (res.isEmpty()) {
+ return LAST_QUERY_EMPTY_TSBLOCK;
+ }
+
+ checkArgument(res.getPositionCount() == 1, "last query result should only
have one record");
+
+ long lastTime = res.getColumn(0).getLong(0);
+ TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
+
+ if (needUpdateCache) {
+ TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
+ lastCache.updateLastCache(fullPath, timeValuePair, false,
Long.MIN_VALUE);
+ }
+
+ tsBlockBuilder.reset();
+
+ LastQueryUtil.appendLastValue(
+ tsBlockBuilder, lastTime, fullPath.getFullPath(),
lastValue.getStringValue(), dataType);
+
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return child.hasNext();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return child.isFinished();
+ }
+
+ @Override
+ public void close() throws Exception {
+ child.close();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
new file mode 100644
index 0000000000..dfb6d82c5c
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class LastCacheScanOperator implements SourceOperator {
+
+ private final OperatorContext operatorContext;
+ private final PlanNodeId sourceId;
+ private TsBlock tsBlock;
+
+ public LastCacheScanOperator(
+ OperatorContext operatorContext, PlanNodeId sourceId, TsBlock tsBlock) {
+ this.operatorContext = operatorContext;
+ this.sourceId = sourceId;
+ this.tsBlock = tsBlock;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock res = tsBlock;
+ tsBlock = null;
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return tsBlock != null && !tsBlock.isEmpty();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNext();
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 736bc5feef..9e7d99cae9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.plan.planner;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
@@ -35,6 +37,7 @@ import
org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
@@ -42,6 +45,7 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -49,6 +53,7 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
@@ -90,11 +95,13 @@ import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregatio
import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;
@@ -117,14 +124,17 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
@@ -136,7 +146,11 @@ import
org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.db.mpp.plan.statement.literal.Literal;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
+import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
import org.apache.commons.lang3.Validate;
@@ -148,11 +162,13 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
/**
* Used to plan a fragment instance. Currently, we simply change it from
PlanNode to executable
@@ -164,6 +180,9 @@ public class LocalExecutionPlanner {
private static final DataBlockManager DATA_BLOCK_MANAGER =
DataBlockService.getInstance().getDataBlockManager();
+ private static final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
+ DataNodeSchemaCache.getInstance();
+
private static final TimeComparator ASC_TIME_COMPARATOR = new
AscTimeComparator();
private static final TimeComparator DESC_TIME_COMPARATOR = new
DescTimeComparator();
@@ -984,6 +1003,187 @@ public class LocalExecutionPlanner {
((SchemaDriverContext)
(context.instanceContext.getDriverContext())).getSchemaRegion());
}
+ @Override
+ public Operator visitLastQueryScan(LastQueryScanNode node,
LocalExecutionPlanContext context) {
+ TimeValuePair timeValuePair =
DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+ if (timeValuePair == null) { // last value is not cached
+ return createUpdateLastCacheOperator(node, context);
+ } else if (!satisfyFilter(
+ context.lastQueryTimeFilter, timeValuePair)) { // cached last value
is not satisfied
+
+ boolean isFilterGtOrGe =
+ (context.lastQueryTimeFilter instanceof Gt
+ || context.lastQueryTimeFilter instanceof GtEq);
+ // time filter is not > or >=, we still need to read from disk
+ if (!isFilterGtOrGe) {
+ return createUpdateLastCacheOperator(node, context);
+ } else { // otherwise, we just ignore it and return null
+ return null;
+ }
+ } else { // cached last value is satisfied, put it into
LastCacheScanOperator
+ context.addCachedLastValue(
+ timeValuePair, node.getPlanNodeId(),
node.getSeriesPath().getFullPath());
+ return null;
+ }
+ }
+
+ private UpdateLastCacheOperator createUpdateLastCacheOperator(
+ LastQueryScanNode node, LocalExecutionPlanContext context) {
+ SeriesAggregationScanOperator lastQueryScan =
createLastQueryScanOperator(node, context);
+
+ return new UpdateLastCacheOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ UpdateLastCacheOperator.class.getSimpleName()),
+ lastQueryScan,
+ node.getSeriesPath(),
+ node.getSeriesPath().getSeriesType(),
+ DATA_NODE_SCHEMA_CACHE,
+ context.needUpdateLastCache);
+ }
+
+ private SeriesAggregationScanOperator createLastQueryScanOperator(
+ LastQueryScanNode node, LocalExecutionPlanContext context) {
+ MeasurementPath seriesPath = node.getSeriesPath();
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesAggregationScanOperator.class.getSimpleName());
+
+ // last_time, last_value
+ List<Aggregator> aggregators =
LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ context.getAllSensors(seriesPath.getDevice(),
seriesPath.getMeasurement()),
+ operatorContext,
+ aggregators,
+ context.lastQueryTimeFilter,
+ false,
+ null);
+ context.addSourceOperator(seriesAggregationScanOperator);
+ context.addPath(seriesPath);
+ return seriesAggregationScanOperator;
+ }
+
+ @Override
+ public Operator visitAlignedLastQueryScan(
+ AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+ TimeValuePair timeValuePair =
DATA_NODE_SCHEMA_CACHE.getLastCache(node.getSeriesPath());
+ if (timeValuePair == null) { // last value is not cached
+ return createUpdateLastCacheOperator(node, context);
+ } else if (!satisfyFilter(
+ context.lastQueryTimeFilter, timeValuePair)) { // cached last value
is not satisfied
+
+ boolean isFilterGtOrGe =
+ (context.lastQueryTimeFilter instanceof Gt
+ || context.lastQueryTimeFilter instanceof GtEq);
+ // time filter is not > or >=, we still need to read from disk
+ if (!isFilterGtOrGe) {
+ return createUpdateLastCacheOperator(node, context);
+ } else { // otherwise, we just ignore it and return null
+ return null;
+ }
+ } else { // cached last value is satisfied, put it into
LastCacheScanOperator
+ context.addCachedLastValue(
+ timeValuePair, node.getPlanNodeId(),
node.getSeriesPath().getFullPath());
+ return null;
+ }
+ }
+
+ private UpdateLastCacheOperator createUpdateLastCacheOperator(
+ AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+ AlignedSeriesAggregationScanOperator lastQueryScan =
+ createLastQueryScanOperator(node, context);
+
+ return new UpdateLastCacheOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ UpdateLastCacheOperator.class.getSimpleName()),
+ lastQueryScan,
+ node.getSeriesPath(),
+ node.getSeriesPath().getSeriesType(),
+ DATA_NODE_SCHEMA_CACHE,
+ context.needUpdateLastCache);
+ }
+
+ private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
+ AlignedLastQueryScanNode node, LocalExecutionPlanContext context) {
+ AlignedPath seriesPath = node.getSeriesPath();
+ OperatorContext operatorContext =
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ AlignedSeriesAggregationScanOperator.class.getSimpleName());
+
+ // last_time, last_value
+ List<Aggregator> aggregators =
LastQueryUtil.createAggregators(seriesPath.getSeriesType());
+ AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
+ new AlignedSeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ seriesPath,
+ operatorContext,
+ aggregators,
+ context.lastQueryTimeFilter,
+ false,
+ null);
+ context.addSourceOperator(seriesAggregationScanOperator);
+ context.addPath(seriesPath);
+ return seriesAggregationScanOperator;
+ }
+
+ @Override
+ public Operator visitLastQueryMerge(
+ LastQueryMergeNode node, LocalExecutionPlanContext context) {
+
+ context.setLastQueryTimeFilter(node.getTimeFilter());
+
context.setNeedUpdateLastCache(LastQueryUtil.needUpdateCache(node.getTimeFilter()));
+
+ List<Operator> operatorList =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ List<TimeValuePair> cachedLastValueList =
context.getCachedLastValueList();
+
+ if (cachedLastValueList != null && !cachedLastValueList.isEmpty()) {
+ TsBlockBuilder builder =
LastQueryUtil.createTsBlockBuilder(cachedLastValueList.size());
+ for (int i = 0; i < cachedLastValueList.size(); i++) {
+ TimeValuePair timeValuePair = cachedLastValueList.get(i);
+ String fullPath = context.cachedLastValuePathList.get(i);
+ LastQueryUtil.appendLastValue(
+ builder,
+ timeValuePair.getTimestamp(),
+ fullPath,
+ timeValuePair.getValue().getStringValue(),
+ timeValuePair.getValue().getDataType().name());
+ }
+
+ LastCacheScanOperator operator =
+ new LastCacheScanOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ context.firstCachedPlanNodeId,
+ LastCacheScanOperator.class.getSimpleName()),
+ context.firstCachedPlanNodeId,
+ builder.build());
+ operatorList.add(operator);
+ }
+
+ return new LastQueryMergeOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LastQueryMergeOperator.class.getSimpleName()),
+ operatorList);
+ }
+
private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
int tsBlockIndex = 0;
@@ -1044,6 +1244,18 @@ public class LocalExecutionPlanner {
private TypeProvider typeProvider;
+ // cached last value in last query
+ private List<TimeValuePair> cachedLastValueList;
+ // full path for each cached last value, this size should be equal to
cachedLastValueList
+ private List<String> cachedLastValuePathList;
+ // PlanNodeId of first LastQueryScanNode/AlignedLastQueryScanNode, it's
used for sourceId of
+ // LastCachedScanOperator
+ private PlanNodeId firstCachedPlanNodeId;
+ // timeFilter for last query
+ private Filter lastQueryTimeFilter;
+ // whether we need to update last cache
+ private boolean needUpdateLastCache;
+
public LocalExecutionPlanContext(
TypeProvider typeProvider, FragmentInstanceContext instanceContext) {
this.typeProvider = typeProvider;
@@ -1086,6 +1298,29 @@ public class LocalExecutionPlanner {
sourceOperators.add(sourceOperator);
}
+ public void setLastQueryTimeFilter(Filter lastQueryTimeFilter) {
+ this.lastQueryTimeFilter = lastQueryTimeFilter;
+ }
+
+ public void setNeedUpdateLastCache(boolean needUpdateLastCache) {
+ this.needUpdateLastCache = needUpdateLastCache;
+ }
+
+ public void addCachedLastValue(
+ TimeValuePair timeValuePair, PlanNodeId planNodeId, String fullPath) {
+ if (cachedLastValueList == null) {
+ cachedLastValueList = new ArrayList<>();
+ cachedLastValuePathList = new ArrayList<>();
+ firstCachedPlanNodeId = planNodeId;
+ }
+ cachedLastValueList.add(timeValuePair);
+ cachedLastValuePathList.add(fullPath);
+ }
+
+ public List<TimeValuePair> getCachedLastValueList() {
+ return cachedLastValueList;
+ }
+
public ISinkHandle getSinkHandle() {
return sinkHandle;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index e8223a42d0..5923bc8942 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -22,6 +22,11 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -35,14 +40,18 @@ public class LastQueryMergeNode extends ProcessNode {
// make sure child in list has been ordered by their sensor name
private List<PlanNode> children;
- public LastQueryMergeNode(PlanNodeId id) {
+ private final Filter timeFilter;
+
+ public LastQueryMergeNode(PlanNodeId id, Filter timeFilter) {
super(id);
this.children = new ArrayList<>();
+ this.timeFilter = timeFilter;
}
- public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children) {
+ public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children, Filter
timeFilter) {
super(id);
this.children = children;
+ this.timeFilter = timeFilter;
}
@Override
@@ -57,7 +66,7 @@ public class LastQueryMergeNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new LastQueryMergeNode(getPlanNodeId());
+ return new LastQueryMergeNode(getPlanNodeId(), timeFilter);
}
@Override
@@ -97,14 +106,29 @@ public class LastQueryMergeNode extends ProcessNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.LAST_QUERY_MERGE.serialize(byteBuffer);
+ if (timeFilter == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ timeFilter.serialize(byteBuffer);
+ }
}
public static LastQueryMergeNode deserialize(ByteBuffer byteBuffer) {
+ Filter timeFilter = null;
+ if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+ timeFilter = FilterFactory.deserialize(byteBuffer);
+ }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryMergeNode(planNodeId);
+ return new LastQueryMergeNode(planNodeId, timeFilter);
}
public void setChildren(List<PlanNode> children) {
this.children = children;
}
+
+ @Nullable
+ public Filter getTimeFilter() {
+ return timeFilter;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
index 837bdbace4..dced79aa0a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -25,14 +25,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
-import javax.annotation.Nullable;
-
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
@@ -43,25 +38,18 @@ public class AlignedLastQueryScanNode extends SourceNode {
// The path of the target series which will be scanned.
private final AlignedPath seriesPath;
- private Filter timeFilter;
-
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
- public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath,
Filter timeFilter) {
+ public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
- this.timeFilter = timeFilter;
}
public AlignedLastQueryScanNode(
- PlanNodeId id,
- AlignedPath seriesPath,
- Filter timeFilter,
- TRegionReplicaSet regionReplicaSet) {
+ PlanNodeId id, AlignedPath seriesPath, TRegionReplicaSet
regionReplicaSet) {
super(id);
this.seriesPath = seriesPath;
- this.timeFilter = timeFilter;
this.regionReplicaSet = regionReplicaSet;
}
@@ -93,7 +81,7 @@ public class AlignedLastQueryScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new AlignedLastQueryScanNode(getPlanNodeId(), seriesPath,
timeFilter, regionReplicaSet);
+ return new AlignedLastQueryScanNode(getPlanNodeId(), seriesPath,
regionReplicaSet);
}
@Override
@@ -118,13 +106,12 @@ public class AlignedLastQueryScanNode extends SourceNode {
if (!super.equals(o)) return false;
AlignedLastQueryScanNode that = (AlignedLastQueryScanNode) o;
return Objects.equals(seriesPath, that.seriesPath)
- && Objects.equals(timeFilter, that.timeFilter)
&& Objects.equals(regionReplicaSet, that.regionReplicaSet);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), seriesPath, timeFilter,
regionReplicaSet);
+ return Objects.hash(super.hashCode(), seriesPath, regionReplicaSet);
}
@Override
@@ -138,34 +125,15 @@ public class AlignedLastQueryScanNode extends SourceNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(byteBuffer);
seriesPath.serialize(byteBuffer);
- if (timeFilter == null) {
- ReadWriteIOUtils.write((byte) 0, byteBuffer);
- } else {
- ReadWriteIOUtils.write((byte) 1, byteBuffer);
- timeFilter.serialize(byteBuffer);
- }
}
public static AlignedLastQueryScanNode deserialize(ByteBuffer byteBuffer) {
AlignedPath partialPath = (AlignedPath)
PathDeserializeUtil.deserialize(byteBuffer);
- Filter timeFilter = null;
- if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
- timeFilter = FilterFactory.deserialize(byteBuffer);
- }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new AlignedLastQueryScanNode(planNodeId, partialPath, timeFilter);
+ return new AlignedLastQueryScanNode(planNodeId, partialPath);
}
public AlignedPath getSeriesPath() {
return seriesPath;
}
-
- @Nullable
- public Filter getTimeFilter() {
- return timeFilter;
- }
-
- public void setTimeFilter(@Nullable Filter timeFilter) {
- this.timeFilter = timeFilter;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index 546c357e82..ab20f9548c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -25,14 +25,9 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
-import javax.annotation.Nullable;
-
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
@@ -45,25 +40,18 @@ public class LastQueryScanNode extends SourceNode {
// The path of the target series which will be scanned.
private final MeasurementPath seriesPath;
- private Filter timeFilter;
-
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
- public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, Filter
timeFilter) {
+ public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
- this.timeFilter = timeFilter;
}
public LastQueryScanNode(
- PlanNodeId id,
- MeasurementPath seriesPath,
- Filter timeFilter,
- TRegionReplicaSet regionReplicaSet) {
+ PlanNodeId id, MeasurementPath seriesPath, TRegionReplicaSet
regionReplicaSet) {
super(id);
this.seriesPath = seriesPath;
- this.timeFilter = timeFilter;
this.regionReplicaSet = regionReplicaSet;
}
@@ -84,15 +72,6 @@ public class LastQueryScanNode extends SourceNode {
return seriesPath;
}
- @Nullable
- public Filter getTimeFilter() {
- return timeFilter;
- }
-
- public void setTimeFilter(@Nullable Filter timeFilter) {
- this.timeFilter = timeFilter;
- }
-
@Override
public void close() throws Exception {}
@@ -108,7 +87,7 @@ public class LastQueryScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new LastQueryScanNode(getPlanNodeId(), seriesPath, timeFilter,
regionReplicaSet);
+ return new LastQueryScanNode(getPlanNodeId(), seriesPath,
regionReplicaSet);
}
@Override
@@ -133,13 +112,12 @@ public class LastQueryScanNode extends SourceNode {
if (!super.equals(o)) return false;
LastQueryScanNode that = (LastQueryScanNode) o;
return Objects.equals(seriesPath, that.seriesPath)
- && Objects.equals(timeFilter, that.timeFilter)
&& Objects.equals(regionReplicaSet, that.regionReplicaSet);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), seriesPath, timeFilter,
regionReplicaSet);
+ return Objects.hash(super.hashCode(), seriesPath, regionReplicaSet);
}
@Override
@@ -153,21 +131,11 @@ public class LastQueryScanNode extends SourceNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer);
seriesPath.serialize(byteBuffer);
- if (timeFilter == null) {
- ReadWriteIOUtils.write((byte) 0, byteBuffer);
- } else {
- ReadWriteIOUtils.write((byte) 1, byteBuffer);
- timeFilter.serialize(byteBuffer);
- }
}
public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
MeasurementPath partialPath = (MeasurementPath)
PathDeserializeUtil.deserialize(byteBuffer);
- Filter timeFilter = null;
- if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
- timeFilter = FilterFactory.deserialize(byteBuffer);
- }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LastQueryScanNode(planNodeId, partialPath, timeFilter);
+ return new LastQueryScanNode(planNodeId, partialPath);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 62773b5ed9..af9526b142 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_VALUE;
+import static
org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
public class LastQueryExecutor {
@@ -379,10 +380,6 @@ public class LastQueryExecutor {
}
}
- private static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
- return filter == null || filter.satisfy(tvPair.getTimestamp(),
tvPair.getValue().getValue());
- }
-
public static void clear() {
ID_TABLE_ENABLED =
IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
new file mode 100644
index 0000000000..72ee86fb65
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LastCacheScanOperatorTest {
+
+ @Test
+ public void batchTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+
+ TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+ LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s1", "true",
"BOOLEAN");
+ LastQueryUtil.appendLastValue(builder, 2, "root.sg.d.s2", "2", "INT32");
+ LastQueryUtil.appendLastValue(builder, 3, "root.sg.d.s3", "3", "INT64");
+ LastQueryUtil.appendLastValue(builder, 4, "root.sg.d.s4", "4.4",
"FLOAT");
+ LastQueryUtil.appendLastValue(builder, 3, "root.sg.d.s5", "3.3",
"DOUBLE");
+ LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s6", "peace",
"TEXT");
+
+ TsBlock tsBlock = builder.build();
+
+ LastCacheScanOperator lastCacheScanOperator =
+ new LastCacheScanOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
planNodeId1, tsBlock);
+
+ assertTrue(lastCacheScanOperator.isBlocked().isDone());
+ assertTrue(lastCacheScanOperator.hasNext());
+ TsBlock result = lastCacheScanOperator.next();
+ assertEquals(tsBlock.getPositionCount(), result.getPositionCount());
+ assertEquals(tsBlock.getValueColumnCount(),
result.getValueColumnCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertEquals(tsBlock.getTimeByIndex(i), result.getTimeByIndex(i));
+ for (int j = 0; j < tsBlock.getValueColumnCount(); j++) {
+ assertEquals(tsBlock.getColumn(j).getBinary(i),
result.getColumn(j).getBinary(i));
+ }
+ }
+ assertFalse(lastCacheScanOperator.hasNext());
+ assertTrue(lastCacheScanOperator.isFinished());
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
new file mode 100644
index 0000000000..03fa1a1584
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class LastQueryMergeOperatorTest {
+
+ private static final String SERIES_SCAN_OPERATOR_TEST_SG =
"root.LastQueryMergeOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+ private ExecutorService instanceNotificationExecutor;
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources,
SERIES_SCAN_OPERATOR_TEST_SG);
+ this.instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ instanceNotificationExecutor.shutdown();
+ }
+
+ @Test
+ public void testLastQueryMergeOperatorTestWithoutCachedValue() {
+ try {
+ List<Aggregator> aggregators1 =
LastQueryUtil.createAggregators(TSDataType.INT32);
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG +
".device0.sensor0", TSDataType.INT32);
+ List<Aggregator> aggregators2 =
LastQueryUtil.createAggregators(TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG +
".device0.sensor1", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+ new SeriesAggregationScanOperator(
+ planNodeId1,
+ measurementPath1,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators1,
+ null,
+ false,
+ null);
+ seriesAggregationScanOperator1.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ UpdateLastCacheOperator updateLastCacheOperator1 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ seriesAggregationScanOperator1,
+ measurementPath1,
+ measurementPath1.getSeriesType(),
+ null,
+ false);
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+ new SeriesAggregationScanOperator(
+ planNodeId3,
+ measurementPath2,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ aggregators2,
+ null,
+ false,
+ null);
+ seriesAggregationScanOperator2.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ UpdateLastCacheOperator updateLastCacheOperator2 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ seriesAggregationScanOperator2,
+ measurementPath2,
+ measurementPath2.getSeriesType(),
+ null,
+ false);
+
+ LastQueryMergeOperator lastQueryMergeOperator =
+ new LastQueryMergeOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ ImmutableList.of(updateLastCacheOperator1,
updateLastCacheOperator2));
+
+ int count = 0;
+ while (!lastQueryMergeOperator.isFinished()) {
+ assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+ assertTrue(lastQueryMergeOperator.hasNext());
+ TsBlock result = lastQueryMergeOperator.next();
+ if (result == null) {
+ continue;
+ }
+ assertEquals(3, result.getValueColumnCount());
+
+ for (int i = 0; i < result.getPositionCount(); i++) {
+ assertEquals(499, result.getTimeByIndex(i));
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+ result.getColumn(0).getBinary(i).toString());
+ assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+ assertEquals(TSDataType.INT32.name(),
result.getColumn(2).getBinary(i).toString());
+ count++;
+ }
+ }
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testUpdateLastCacheOperatorTestWithCachedValue() {
+ try {
+ List<Aggregator> aggregators1 =
LastQueryUtil.createAggregators(TSDataType.INT32);
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG +
".device0.sensor0", TSDataType.INT32);
+ List<Aggregator> aggregators2 =
LastQueryUtil.createAggregators(TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG +
".device0.sensor1", TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId4, LastCacheScanOperator.class.getSimpleName());
+
+ PlanNodeId planNodeId6 = new PlanNodeId("6");
+ fragmentInstanceContext.addOperatorContext(
+ 6, planNodeId6, LastQueryMergeOperator.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+ new SeriesAggregationScanOperator(
+ planNodeId1,
+ measurementPath1,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators1,
+ null,
+ false,
+ null);
+ seriesAggregationScanOperator1.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ UpdateLastCacheOperator updateLastCacheOperator1 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ seriesAggregationScanOperator1,
+ measurementPath1,
+ measurementPath1.getSeriesType(),
+ null,
+ false);
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+ new SeriesAggregationScanOperator(
+ planNodeId3,
+ measurementPath2,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ aggregators2,
+ null,
+ false,
+ null);
+ seriesAggregationScanOperator2.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ UpdateLastCacheOperator updateLastCacheOperator2 =
+ new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ seriesAggregationScanOperator2,
+ measurementPath2,
+ measurementPath2.getSeriesType(),
+ null,
+ false);
+
+ TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+ LastQueryUtil.appendLastValue(
+ builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor2",
"10499", "INT32");
+ LastQueryUtil.appendLastValue(
+ builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor3",
"10499", "INT32");
+ LastQueryUtil.appendLastValue(
+ builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor4",
"10499", "INT32");
+
+ TsBlock tsBlock = builder.build();
+
+ LastCacheScanOperator lastCacheScanOperator =
+ new LastCacheScanOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4),
planNodeId5, tsBlock);
+
+ LastQueryMergeOperator lastQueryMergeOperator =
+ new LastQueryMergeOperator(
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ ImmutableList.of(
+ updateLastCacheOperator1, updateLastCacheOperator2,
lastCacheScanOperator));
+
+ int count = 0;
+ while (!lastQueryMergeOperator.isFinished()) {
+ assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+ assertTrue(lastQueryMergeOperator.hasNext());
+ TsBlock result = lastQueryMergeOperator.next();
+ if (result == null) {
+ continue;
+ }
+ assertEquals(3, result.getValueColumnCount());
+
+ for (int i = 0; i < result.getPositionCount(); i++) {
+ assertEquals(499, result.getTimeByIndex(i));
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+ result.getColumn(0).getBinary(i).toString());
+ assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+ assertEquals(TSDataType.INT32.name(),
result.getColumn(2).getBinary(i).toString());
+ count++;
+ }
+ }
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index 0c62a46328..7e95c4ac2c 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -479,7 +478,7 @@ public class SeriesAggregationScanOperatorTest {
createFragmentInstanceContext(instanceId, stateMachine);
PlanNodeId planNodeId = new PlanNodeId("1");
fragmentInstanceContext.addOperatorContext(
- 1, planNodeId, SeriesScanOperator.class.getSimpleName());
+ 1, planNodeId, SeriesAggregationScanOperator.class.getSimpleName());
SeriesAggregationScanOperator seriesAggregationScanOperator =
new SeriesAggregationScanOperator(
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
new file mode 100644
index 0000000000..b78cb7b1ac
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class UpdateLastCacheOperatorTest {
+
+ private static final String SERIES_SCAN_OPERATOR_TEST_SG =
"root.UpdateLastCacheOperator";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+ private ExecutorService instanceNotificationExecutor;
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources,
SERIES_SCAN_OPERATOR_TEST_SG);
+ this.instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ instanceNotificationExecutor.shutdown();
+ }
+
+ @Test
+ public void testUpdateLastCacheOperatorTestWithoutTimeFilter() {
+ try {
+ List<Aggregator> aggregators =
LastQueryUtil.createAggregators(TSDataType.INT32);
+ UpdateLastCacheOperator updateLastCacheOperator =
+ initUpdateLastCacheOperator(aggregators, null, false, null);
+
+ assertTrue(updateLastCacheOperator.isBlocked().isDone());
+ assertTrue(updateLastCacheOperator.hasNext());
+ TsBlock result = updateLastCacheOperator.next();
+ assertEquals(1, result.getPositionCount());
+ assertEquals(3, result.getValueColumnCount());
+
+ assertEquals(499, result.getTimeByIndex(0));
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+ result.getColumn(0).getBinary(0).toString());
+ assertEquals("10499", result.getColumn(1).getBinary(0).toString());
+ assertEquals(TSDataType.INT32.name(),
result.getColumn(2).getBinary(0).toString());
+
+ assertFalse(updateLastCacheOperator.hasNext());
+ assertTrue(updateLastCacheOperator.isFinished());
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testUpdateLastCacheOperatorTestWithTimeFilter1() {
+ try {
+ List<Aggregator> aggregators =
LastQueryUtil.createAggregators(TSDataType.INT32);
+ Filter timeFilter = TimeFilter.gtEq(200);
+ UpdateLastCacheOperator updateLastCacheOperator =
+ initUpdateLastCacheOperator(aggregators, timeFilter, false, null);
+
+ assertTrue(updateLastCacheOperator.isBlocked().isDone());
+ assertTrue(updateLastCacheOperator.hasNext());
+ TsBlock result = updateLastCacheOperator.next();
+ assertEquals(1, result.getPositionCount());
+ assertEquals(3, result.getValueColumnCount());
+
+ assertEquals(499, result.getTimeByIndex(0));
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+ result.getColumn(0).getBinary(0).toString());
+ assertEquals("10499", result.getColumn(1).getBinary(0).toString());
+ assertEquals(TSDataType.INT32.name(),
result.getColumn(2).getBinary(0).toString());
+
+ assertFalse(updateLastCacheOperator.hasNext());
+ assertTrue(updateLastCacheOperator.isFinished());
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testUpdateLastCacheOperatorTestWithTimeFilter2() {
+ try {
+ List<Aggregator> aggregators =
LastQueryUtil.createAggregators(TSDataType.INT32);
+ Filter timeFilter = TimeFilter.ltEq(120);
+ UpdateLastCacheOperator updateLastCacheOperator =
+ initUpdateLastCacheOperator(aggregators, timeFilter, false, null);
+
+ assertTrue(updateLastCacheOperator.isBlocked().isDone());
+ assertTrue(updateLastCacheOperator.hasNext());
+ TsBlock result = updateLastCacheOperator.next();
+ assertEquals(1, result.getPositionCount());
+ assertEquals(3, result.getValueColumnCount());
+
+ assertEquals(120, result.getTimeByIndex(0));
+ assertEquals(
+ SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+ result.getColumn(0).getBinary(0).toString());
+ assertEquals("20120", result.getColumn(1).getBinary(0).toString());
+ assertEquals(TSDataType.INT32.name(),
result.getColumn(2).getBinary(0).toString());
+
+ assertFalse(updateLastCacheOperator.hasNext());
+ assertTrue(updateLastCacheOperator.isFinished());
+
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ public UpdateLastCacheOperator initUpdateLastCacheOperator(
+ List<Aggregator> aggregators,
+ Filter timeFilter,
+ boolean ascending,
+ GroupByTimeParameter groupByTimeParameter)
+ throws IllegalPathException {
+ MeasurementPath measurementPath =
+ new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+ Set<String> allSensors = Sets.newHashSet("sensor0");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+ SeriesAggregationScanOperator seriesAggregationScanOperator =
+ new SeriesAggregationScanOperator(
+ planNodeId1,
+ measurementPath,
+ allSensors,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ aggregators,
+ timeFilter,
+ ascending,
+ groupByTimeParameter);
+ seriesAggregationScanOperator.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources));
+
+ return new UpdateLastCacheOperator(
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ seriesAggregationScanOperator,
+ measurementPath,
+ measurementPath.getSeriesType(),
+ null,
+ false);
+ }
+}