This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1a64ec2a225 Impl visitTreeNonAlignedDeviceViewScan
1a64ec2a225 is described below
commit 1a64ec2a225e4ded26b1327841cda4f298dd044a
Author: shuwenwei <[email protected]>
AuthorDate: Fri Apr 18 11:34:41 2025 +0800
Impl visitTreeNonAlignedDeviceViewScan
---
.../operator/process/FilterAndProjectOperator.java | 15 +
.../execution/operator/process/LimitOperator.java | 6 +
.../execution/operator/process/OffsetOperator.java | 6 +
.../process/join/LeftOuterTimeJoinOperator.java | 15 +-
.../join/TableLeftOuterTimeJoinOperator.java | 55 ++
.../relational/AbstractTableScanOperator.java | 56 +-
.../relational/DeviceIteratorScanOperator.java | 233 ++++++++
.../MeasurementToTableViewAdaptorUtils.java | 103 ++++
.../TreeToTableViewAdaptorOperator.java} | 90 +--
.../plan/planner/TableOperatorGenerator.java | 575 +++++++++++++++---
.../operator/DeviceIteratorScanOperatorTest.java | 228 ++++++++
...nAlignedTreeDeviceViewScanOperatorTreeTest.java | 650 +++++++++++++++++++++
.../TreeToTableViewAdaptorOperatorTest.java | 237 ++++++++
13 files changed, 2106 insertions(+), 163 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
index c759c3a0fdf..06ca6d6d2ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
@@ -100,6 +100,21 @@ public class FilterAndProjectOperator implements
ProcessOperator {
this.hasFilter = hasFilter;
}
+ public FilterAndProjectOperator(
+ FilterAndProjectOperator filterAndProjectOperator, Operator
inputOperator) {
+ this.operatorContext = filterAndProjectOperator.operatorContext;
+ this.filterLeafColumnTransformerList =
filterAndProjectOperator.filterLeafColumnTransformerList;
+ this.filterOutputTransformer =
filterAndProjectOperator.filterOutputTransformer;
+ this.commonTransformerList =
filterAndProjectOperator.commonTransformerList;
+ this.projectLeafColumnTransformerList =
+ filterAndProjectOperator.projectLeafColumnTransformerList;
+ this.projectOutputTransformerList =
filterAndProjectOperator.projectOutputTransformerList;
+ this.hasNonMappableUDF = filterAndProjectOperator.hasNonMappableUDF;
+ this.hasFilter = filterAndProjectOperator.hasFilter;
+ this.filterTsBlockBuilder = filterAndProjectOperator.filterTsBlockBuilder;
+ this.inputOperator = inputOperator;
+ }
+
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
index 6fdfdbed554..85ef22ababd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
@@ -45,6 +45,12 @@ public class LimitOperator implements ProcessOperator {
this.child = requireNonNull(child, "child operator is null");
}
+ public LimitOperator(LimitOperator limitOperator, Operator child) {
+ this.operatorContext = limitOperator.operatorContext;
+ this.remainingLimit = limitOperator.remainingLimit;
+ this.child = child;
+ }
+
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
index a8581466f89..24fc72d0941 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
@@ -45,6 +45,12 @@ public class OffsetOperator implements ProcessOperator {
this.child = requireNonNull(child, "child operator is null");
}
+ public OffsetOperator(OffsetOperator offsetOperator, Operator child) {
+ this.operatorContext = offsetOperator.operatorContext;
+ this.remainingOffset = offsetOperator.remainingOffset;
+ this.child = child;
+ }
+
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
index 0607d0a1e35..8460b682e24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
@@ -54,7 +54,7 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
private final TsBlockBuilder resultBuilder;
private final Operator left;
- private final int leftColumnCount;
+ protected final int leftColumnCount;
private TsBlock leftTsBlock;
@@ -194,7 +194,7 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
private void appendLeftTableRow() {
for (int i = 0; i < leftColumnCount; i++) {
Column leftColumn = leftTsBlock.getColumn(i);
- ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ ColumnBuilder columnBuilder =
resultBuilder.getColumnBuilder(getOutputColumnIndex(i, true));
if (leftColumn.isNull(leftIndex)) {
columnBuilder.appendNull();
} else {
@@ -231,7 +231,8 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
// right table has this time, append right table's corresponding row
for (int i = leftColumnCount; i < outputColumnCount; i++) {
Column rightColumn = rightTsBlock.getColumn(i - leftColumnCount);
- ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ ColumnBuilder columnBuilder =
+ resultBuilder.getColumnBuilder(getOutputColumnIndex(i, false));
if (rightColumn.isNull(rightIndex)) {
columnBuilder.appendNull();
} else {
@@ -273,7 +274,7 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
private void appendValueColumnForLeftTable(int rowSize) {
for (int i = 0; i < leftColumnCount; i++) {
- ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ ColumnBuilder columnBuilder =
resultBuilder.getColumnBuilder(getOutputColumnIndex(i, true));
Column valueColumn = leftTsBlock.getColumn(i);
if (valueColumn.mayHaveNull()) {
@@ -296,7 +297,7 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
private void appendNullForRightTable(int rowSize) {
int nullCount = rowSize - leftIndex;
for (int i = leftColumnCount; i < outputColumnCount; i++) {
- ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ ColumnBuilder columnBuilder =
resultBuilder.getColumnBuilder(getOutputColumnIndex(i, false));
columnBuilder.appendNull(nullCount);
}
}
@@ -321,6 +322,10 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
return !tsBlockIsNotEmpty(leftTsBlock, leftIndex) && left.isFinished();
}
+ protected int getOutputColumnIndex(int inputColumnIndex, boolean
isLeftTable) {
+ return inputColumnIndex;
+ }
+
@Override
public long calculateMaxPeekMemory() {
return Math.max(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/TableLeftOuterTimeJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/TableLeftOuterTimeJoinOperator.java
new file mode 100644
index 00000000000..19b9e2eb4d4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/TableLeftOuterTimeJoinOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.join;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.List;
+import java.util.Map;
+
+public class TableLeftOuterTimeJoinOperator extends LeftOuterTimeJoinOperator {
+
+ private final Map<InputLocation, Integer> outputColumnMap;
+
+ public TableLeftOuterTimeJoinOperator(
+ OperatorContext operatorContext,
+ Operator leftChild,
+ Operator rightChild,
+ int leftColumnCount,
+ Map<InputLocation, Integer> outputColumnMap,
+ List<TSDataType> dataTypes,
+ TimeComparator comparator) {
+ super(operatorContext, leftChild, leftColumnCount, rightChild, dataTypes,
comparator);
+ this.outputColumnMap = outputColumnMap;
+ }
+
+ @Override
+ protected int getOutputColumnIndex(int inputColumnIndex, boolean
isLeftTable) {
+ return outputColumnMap.get(
+ isLeftTable
+ ? new InputLocation(0, inputColumnIndex)
+ : new InputLocation(1, inputColumnIndex - leftColumnCount));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
index f8de2d57572..96de6f92e10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
@@ -32,16 +32,11 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.column.BinaryColumn;
import org.apache.tsfile.read.common.block.column.LongColumn;
-import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -200,57 +195,18 @@ public abstract class AbstractTableScanOperator extends
AbstractSeriesScanOperat
}
private void constructResultTsBlock() {
- int positionCount = measurementDataBlock.getPositionCount();
DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
- Column[] valueColumns = new Column[columnsIndexArray.length];
- for (int i = 0; i < columnsIndexArray.length; i++) {
- switch (columnSchemas.get(i).getColumnCategory()) {
- case TAG:
- String idColumnValue = getNthIdColumnValue(currentDeviceEntry,
columnsIndexArray[i]);
-
- valueColumns[i] =
- getIdOrAttributeValueColumn(
- idColumnValue == null
- ? null
- : new Binary(idColumnValue, TSFileConfig.STRING_CHARSET),
- positionCount);
- break;
- case ATTRIBUTE:
- Binary attributeColumnValue =
-
currentDeviceEntry.getAttributeColumnValues()[columnsIndexArray[i]];
- valueColumns[i] = getIdOrAttributeValueColumn(attributeColumnValue,
positionCount);
- break;
- case FIELD:
- valueColumns[i] =
measurementDataBlock.getColumn(columnsIndexArray[i]);
- break;
- case TIME:
- valueColumns[i] = measurementDataBlock.getTimeColumn();
- break;
- default:
- throw new IllegalArgumentException(
- "Unexpected column category: " +
columnSchemas.get(i).getColumnCategory());
- }
- }
this.resultTsBlock =
- new TsBlock(
- positionCount,
- new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, positionCount),
- valueColumns);
+ MeasurementToTableViewAdaptorUtils.toTableBlock(
+ measurementDataBlock,
+ columnsIndexArray,
+ columnSchemas,
+ deviceEntries.get(currentDeviceIndex),
+ idColumnIndex -> getNthIdColumnValue(currentDeviceEntry,
idColumnIndex));
}
abstract String getNthIdColumnValue(DeviceEntry deviceEntry, int
idColumnIndex);
- private RunLengthEncodedColumn getIdOrAttributeValueColumn(Binary value, int
positionCount) {
- if (value == null) {
- return new RunLengthEncodedColumn(
- new BinaryColumn(1, Optional.of(new boolean[] {true}), new Binary[]
{null}),
- positionCount);
- } else {
- return new RunLengthEncodedColumn(
- new BinaryColumn(1, Optional.empty(), new Binary[] {value}),
positionCount);
- }
- }
-
@Override
public boolean hasNext() throws Exception {
return !isFinished();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
new file mode 100644
index 00000000000..1689fd2fd84
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class DeviceIteratorScanOperator extends AbstractDataSourceOperator {
+ private static final long INSTANCE_SIZE =
+
RamUsageEstimator.shallowSizeOfInstance(DeviceIteratorScanOperator.class);
+
+ private final OperatorContext operatorContext;
+ private final List<DeviceEntry> deviceEntries;
+ private final DeviceChildOperatorTreeGenerator
deviceChildOperatorTreeGenerator;
+
+ private QueryDataSource queryDataSource;
+ private int currentDeviceIndex;
+ private Operator currentDeviceRootOperator;
+ private List<Operator> dataSourceOperators;
+ // For each device operator tree, isBlocked needs to be called once.
+ // Calling isBlocked will set this field to true.
+ // When isBlocked is not called for a device, hasNext will return true and
next will return null.
+ private boolean currentDeviceInit;
+
+ public DeviceIteratorScanOperator(
+ OperatorContext operatorContext,
+ List<DeviceEntry> deviceEntries,
+ DeviceChildOperatorTreeGenerator childOperatorTreeGenerator) {
+ this.operatorContext = operatorContext;
+ this.deviceEntries = deviceEntries;
+ this.deviceChildOperatorTreeGenerator = childOperatorTreeGenerator;
+ this.currentDeviceIndex = 0;
+ this.currentDeviceInit = false;
+ this.operatorContext.recordSpecifiedInfo(
+ AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING,
Integer.toString(0));
+ constructCurrentDeviceOperatorTree();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ if (currentDeviceRootOperator != null &&
currentDeviceRootOperator.hasNext()) {
+ return true;
+ } else {
+ if (!currentDeviceInit) {
+ return true;
+ }
+ if (currentDeviceIndex + 1 >= deviceEntries.size()) {
+ return false;
+ } else {
+ nextDevice();
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return !hasNext();
+ }
+
+ private void nextDevice() throws Exception {
+ currentDeviceIndex++;
+
deviceChildOperatorTreeGenerator.getCurrentDeviceStartCloseOperator().close();
+ if (currentDeviceIndex >= deviceEntries.size()) {
+ return;
+ }
+ constructCurrentDeviceOperatorTree();
+ queryDataSource.reset();
+ initQueryDataSource(queryDataSource);
+ this.operatorContext.recordSpecifiedInfo(
+ AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING,
+ Integer.toString(currentDeviceIndex));
+ }
+
+ private void constructCurrentDeviceOperatorTree() {
+ if (this.deviceEntries.isEmpty()) {
+ return;
+ }
+ if (this.deviceEntries.get(this.currentDeviceIndex) == null) {
+ throw new IllegalStateException(
+ "Device entries of index " + this.currentDeviceIndex + " is empty");
+ }
+ DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
+
+
deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry);
+ currentDeviceRootOperator =
deviceChildOperatorTreeGenerator.getCurrentDeviceRootOperator();
+ dataSourceOperators =
deviceChildOperatorTreeGenerator.getCurrentDeviceDataSourceOperators();
+ currentDeviceInit = false;
+ }
+
+ @Override
+ public void initQueryDataSource(IQueryDataSource dataSource) {
+ this.queryDataSource = (QueryDataSource) dataSource;
+ if (dataSourceOperators == null || dataSourceOperators.isEmpty()) {
+ return;
+ }
+ for (Operator operator : dataSourceOperators) {
+ ((AbstractDataSourceOperator) operator).initQueryDataSource(dataSource);
+ }
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (!currentDeviceInit) {
+ return null;
+ }
+ return currentDeviceRootOperator.next();
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ currentDeviceInit = true;
+ return currentDeviceRootOperator.isBlocked();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (currentDeviceRootOperator != null) {
+ currentDeviceRootOperator.close();
+ }
+ }
+
+ @Override
+ protected List<TSDataType> getResultDataTypes() {
+ throw new UnsupportedOperationException(
+ "Should not call getResultDataTypes() method in
DeviceIteratorScanOperator");
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return currentDeviceRootOperator.calculateMaxPeekMemory();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return currentDeviceRootOperator.calculateMaxReturnSize();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return currentDeviceRootOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(currentDeviceRootOperator)
+ + RamUsageEstimator.sizeOfCollection(deviceEntries);
+ }
+
+ public static class TreeNonAlignedDeviceViewScanParameters {
+ public final OperatorContext context;
+ public final List<DeviceEntry> deviceEntries;
+ public final List<String> measurementColumnNames;
+ public final Set<String> allSensors;
+ public final List<IMeasurementSchema> measurementSchemas;
+ public final DeviceChildOperatorTreeGenerator generator;
+
+ public TreeNonAlignedDeviceViewScanParameters(
+ Set<String> allSensors,
+ OperatorContext context,
+ List<DeviceEntry> deviceEntries,
+ List<String> measurementColumnNames,
+ List<IMeasurementSchema> measurementSchemas,
+ DeviceChildOperatorTreeGenerator generator) {
+ this.allSensors = allSensors;
+ this.context = context;
+ this.deviceEntries = deviceEntries;
+ this.measurementColumnNames = measurementColumnNames;
+ this.measurementSchemas = measurementSchemas;
+ this.generator = generator;
+ }
+ }
+
+ public interface DeviceChildOperatorTreeGenerator {
+ // Do the offset and limit operator need to keep after the device iterator
+ boolean keepOffsetAndLimitOperatorAfterDeviceIterator();
+
+ // Generate the following operator subtree based on the current deviceEntry
+ void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry);
+
+ // Returns the root operator of the subtree
+ Operator getCurrentDeviceRootOperator();
+
+ // Returns all DataSourceOperators created this time for use in
initQueryDataSource in
+ // DeviceIterator
+ List<Operator> getCurrentDeviceDataSourceOperators();
+
+ // Returns which operator to close after switching device
+ Operator getCurrentDeviceStartCloseOperator();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MeasurementToTableViewAdaptorUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MeasurementToTableViewAdaptorUtils.java
new file mode 100644
index 00000000000..0088b2a0956
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MeasurementToTableViewAdaptorUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+
+import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+
+public class MeasurementToTableViewAdaptorUtils {
+ private MeasurementToTableViewAdaptorUtils() {}
+
+ public static TsBlock toTableBlock(
+ TsBlock measurementDataBlock,
+ int[] columnsIndexArray,
+ List<ColumnSchema> columnSchemas,
+ DeviceEntry deviceEntry,
+ GetNthIdColumnValueFunc func) {
+ if (measurementDataBlock == null) {
+ return null;
+ }
+ int positionCount = measurementDataBlock.getPositionCount();
+ Column[] valueColumns = new Column[columnsIndexArray.length];
+ for (int i = 0; i < columnsIndexArray.length; i++) {
+ switch (columnSchemas.get(i).getColumnCategory()) {
+ case TAG:
+ String idColumnValue =
func.getNthIdColumnValue(columnsIndexArray[i]);
+
+ valueColumns[i] =
+ getIdOrAttributeValueColumn(
+ idColumnValue == null
+ ? null
+ : new Binary(idColumnValue, TSFileConfig.STRING_CHARSET),
+ positionCount);
+ break;
+ case ATTRIBUTE:
+ Binary attributeColumnValue =
+ deviceEntry.getAttributeColumnValues()[columnsIndexArray[i]];
+ valueColumns[i] = getIdOrAttributeValueColumn(attributeColumnValue,
positionCount);
+ break;
+ case FIELD:
+ valueColumns[i] =
measurementDataBlock.getColumn(columnsIndexArray[i]);
+ break;
+ case TIME:
+ valueColumns[i] = measurementDataBlock.getTimeColumn();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected column category: " +
columnSchemas.get(i).getColumnCategory());
+ }
+ }
+ return new TsBlock(
+ positionCount,
+ new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, positionCount),
+ valueColumns);
+ }
+
+ private static RunLengthEncodedColumn getIdOrAttributeValueColumn(
+ Binary value, int positionCount) {
+ if (value == null) {
+ return new RunLengthEncodedColumn(
+ new BinaryColumn(
+ 1, Optional.of(new boolean[] {true}), new
org.apache.tsfile.utils.Binary[] {null}),
+ positionCount);
+ } else {
+ return new RunLengthEncodedColumn(
+ new BinaryColumn(1, Optional.empty(), new
org.apache.tsfile.utils.Binary[] {value}),
+ positionCount);
+ }
+ }
+
+ @FunctionalInterface
+ public interface GetNthIdColumnValueFunc {
+ String getNthIdColumnValue(int idColumnIndex);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeToTableViewAdaptorOperator.java
similarity index 52%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeToTableViewAdaptorOperator.java
index a8581466f89..ff631af4072 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeToTableViewAdaptorOperator.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -17,69 +17,80 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.operator.process;
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.RamUsageEstimator;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-public class OffsetOperator implements ProcessOperator {
+import java.util.List;
+public class TreeToTableViewAdaptorOperator implements ProcessOperator {
private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(OffsetOperator.class);
+
RamUsageEstimator.shallowSizeOfInstance(TreeToTableViewAdaptorOperator.class);
private final OperatorContext operatorContext;
- private long remainingOffset;
+ private final DeviceEntry currentDeviceEntry;
+ private final int[] columnsIndexArray;
+ private final List<ColumnSchema> columnSchemas;
private final Operator child;
-
- public OffsetOperator(OperatorContext operatorContext, long offset, Operator
child) {
- this.operatorContext = requireNonNull(operatorContext, "operatorContext is
null");
- checkArgument(offset >= 0, "offset must be at least zero");
- this.remainingOffset = offset;
- this.child = requireNonNull(child, "child operator is null");
+ private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
+
+ public TreeToTableViewAdaptorOperator(
+ OperatorContext operatorContext,
+ DeviceEntry deviceEntry,
+ int[] columnIndexArray,
+ List<ColumnSchema> columnSchemas,
+ Operator child,
+ IDeviceID.TreeDeviceIdColumnValueExtractor extractor) {
+ this.operatorContext = operatorContext;
+ this.currentDeviceEntry = deviceEntry;
+ this.columnsIndexArray = columnIndexArray;
+ this.columnSchemas = columnSchemas;
+ this.child = child;
+ this.extractor = extractor;
}
@Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
+ public boolean hasNext() throws Exception {
+ return child.hasNext();
}
@Override
- public ListenableFuture<?> isBlocked() {
- return child.isBlocked();
+ public TsBlock next() throws Exception {
+ TsBlock measurementDataBlock = child.next();
+ return MeasurementToTableViewAdaptorUtils.toTableBlock(
+ measurementDataBlock,
+ columnsIndexArray,
+ columnSchemas,
+ currentDeviceEntry,
+ idColumnIndex -> getNthIdColumnValue(currentDeviceEntry,
idColumnIndex));
+ }
+
+ private String getNthIdColumnValue(DeviceEntry deviceEntry, int
idColumnIndex) {
+ return (String) extractor.extract(deviceEntry.getDeviceID(),
idColumnIndex);
}
@Override
- public TsBlock next() throws Exception {
- TsBlock block = child.nextWithTimer();
- if (block == null) {
- return null;
- }
- if (remainingOffset > 0) {
- // It's safe to narrow long to int here, because
block.getPositionCount() will always be less
- // than Integer.MAX_VALUE
- int offset = (int) Math.min(remainingOffset, block.getPositionCount());
- remainingOffset -= offset;
- return block.getRegion(offset, block.getPositionCount() - offset);
- } else {
- return block;
- }
+ public void close() throws Exception {
+ child.close();
}
@Override
- public boolean hasNext() throws Exception {
- return child.hasNextWithTimer();
+ public ListenableFuture<?> isBlocked() {
+ return child.isBlocked();
}
@Override
- public void close() throws Exception {
- child.close();
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
}
@Override
@@ -89,7 +100,7 @@ public class OffsetOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
- return child.calculateMaxPeekMemoryWithCounter();
+ return child.calculateMaxPeekMemory();
}
@Override
@@ -105,7 +116,10 @@ public class OffsetOperator implements ProcessOperator {
@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child)
- +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext);
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(currentDeviceEntry)
+ + RamUsageEstimator.sizeOf(columnsIndexArray)
+ + RamUsageEstimator.sizeOfCollection(columnSchemas);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index fd54b3135a1..ae0956a42d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
@@ -70,7 +71,14 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFil
import
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWGroupWoMoOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWoGroupWMoOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWoGroupWoMoOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.InnerTimeJoinOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.SimpleNestedLoopCrossJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.TableLeftOuterTimeJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparatorFactory;
import
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.CountMergeOperator;
@@ -81,9 +89,11 @@ import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSo
import
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractAggTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.MarkDistinctOperator;
@@ -94,6 +104,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.Merg
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
@@ -132,6 +143,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.CastToTimestampLi
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
@@ -365,8 +378,489 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
@Override
public Operator visitTreeNonAlignedDeviceViewScan(
TreeNonAlignedDeviceViewScanNode node, LocalExecutionPlanContext
context) {
- throw new UnsupportedOperationException(
- "view for non aligned devices in tree is not supported");
+
+ DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters
parameter =
+ constructTreeNonAlignedDeviceViewScanOperatorParameter(
+ node,
+ context,
+ TreeNonAlignedDeviceViewScanNode.class.getSimpleName(),
+ node.getMeasurementColumnNameMap());
+
+ DeviceIteratorScanOperator treeNonAlignedDeviceIteratorScanOperator =
+ new DeviceIteratorScanOperator(
+ parameter.context, parameter.deviceEntries, parameter.generator);
+ addSource(
+ treeNonAlignedDeviceIteratorScanOperator,
+ context,
+ node,
+ parameter.measurementColumnNames,
+ parameter.measurementSchemas,
+ parameter.allSensors,
+ TreeNonAlignedDeviceViewScanNode.class.getSimpleName());
+
+ if (!parameter.generator.keepOffsetAndLimitOperatorAfterDeviceIterator()) {
+ return treeNonAlignedDeviceIteratorScanOperator;
+ }
+ Operator operator = treeNonAlignedDeviceIteratorScanOperator;
+ if (node.getPushDownOffset() > 0) {
+ operator = new OffsetOperator(parameter.context,
node.getPushDownOffset(), operator);
+ }
+ if (node.getPushDownLimit() > 0) {
+ operator = new LimitOperator(parameter.context, node.getPushDownLimit(),
operator);
+ }
+ return operator;
+ }
+
+ private DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters
+ constructTreeNonAlignedDeviceViewScanOperatorParameter(
+ TreeNonAlignedDeviceViewScanNode node,
+ LocalExecutionPlanContext context,
+ String className,
+ Map<String, String> fieldColumnsRenameMap) {
+ if (node.isPushLimitToEachDevice() && node.getPushDownOffset() > 0) {
+ throw new IllegalArgumentException(
+ "PushDownOffset should not be set when isPushLimitToEachDevice is
true.");
+ }
+ CommonTableScanOperatorParameters commonParameter =
+ new CommonTableScanOperatorParameters(node, fieldColumnsRenameMap,
true);
+ List<IMeasurementSchema> measurementSchemas =
commonParameter.measurementSchemas;
+ List<String> measurementColumnNames =
commonParameter.measurementColumnNames;
+ List<ColumnSchema> fullColumnSchemas = commonParameter.columnSchemas;
+ int[] columnsIndexArray = commonParameter.columnsIndexArray;
+
+ boolean isSingleColumn = measurementSchemas.size() == 1;
+
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(context.getNextOperatorId(),
node.getPlanNodeId(), className);
+
+ Set<String> allSensors = new HashSet<>(measurementColumnNames);
+
+ DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator
deviceChildOperatorTreeGenerator =
+ new DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator() {
+
+ private Operator operator;
+ private List<SeriesScanOptions> seriesScanOptionsList;
+ private List<Operator> seriesScanOperators;
+ private FilterAndProjectOperator filterAndProjectOperator;
+ private OffsetOperator reuseOffsetOperator;
+ private LimitOperator reuseLimitOperator;
+ private Operator startCloseInternalOperator;
+
+ private List<Expression> cannotPushDownConjuncts;
+ private boolean removeUpperOffsetAndLimitOperator;
+
+ @Override
+ public boolean keepOffsetAndLimitOperatorAfterDeviceIterator() {
+ calculateSeriesScanOptionsList();
+ return !removeUpperOffsetAndLimitOperator &&
!node.isPushLimitToEachDevice();
+ }
+
+ @Override
+ public void generateCurrentDeviceOperatorTree(DeviceEntry
deviceEntry) {
+ calculateSeriesScanOptionsList();
+ operator = constructTreeToTableViewAdaptorOperator(deviceEntry);
+ if (isSingleColumn) {
+ return;
+ }
+ if (!cannotPushDownConjuncts.isEmpty()
+ || node.getAssignments().size() !=
node.getOutputSymbols().size()) {
+ operator = getFilterAndProjectOperator(operator);
+ }
+ if (!node.isPushLimitToEachDevice() ||
removeUpperOffsetAndLimitOperator) {
+ return;
+ }
+ if (node.getPushDownLimit() > 0) {
+ operator = new LimitOperator(operatorContext,
node.getPushDownLimit(), operator);
+ }
+ }
+
+ private void calculateSeriesScanOptionsList() {
+ if (seriesScanOptionsList != null) {
+ return;
+ }
+ seriesScanOptionsList = new ArrayList<>(measurementSchemas.size());
+ cannotPushDownConjuncts = new ArrayList<>();
+ Map<String, List<Expression>> pushDownConjunctsForEachMeasurement
= new HashMap<>();
+ if (node.getPushDownPredicate() != null) {
+ List<Expression> conjuncts =
IrUtils.extractConjuncts(node.getPushDownPredicate());
+ for (Expression conjunct : conjuncts) {
+ Set<Symbol> symbols = SymbolsExtractor.extractUnique(conjunct);
+ boolean containsMultiDataSource = symbols.size() > 1;
+ if (containsMultiDataSource) {
+ cannotPushDownConjuncts.add(conjunct);
+ continue;
+ }
+ String symbolName = symbols.iterator().next().getName();
+ pushDownConjunctsForEachMeasurement
+ .computeIfAbsent(symbolName, k -> new ArrayList<>())
+ .add(conjunct);
+ }
+ }
+
+ boolean canPushDownLimit = cannotPushDownConjuncts.isEmpty();
+ // only use full outer time join
+ boolean canPushDownLimitToAllSeriesScanOptions =
+ canPushDownLimit &&
pushDownConjunctsForEachMeasurement.isEmpty();
+ // the left child of LeftOuterTimeJoinOperator is
SeriesScanOperator
+ boolean pushDownLimitToLeftChildSeriesScanOperator =
+ canPushDownLimit && pushDownConjunctsForEachMeasurement.size()
== 1;
+ // the left child of LeftOuterTimeJoinOperator is
InnerTimeJoinOperator
+ boolean pushDownOffsetAndLimitAfterInnerJoinOperator =
+ canPushDownLimit && pushDownConjunctsForEachMeasurement.size()
> 1;
+ removeUpperOffsetAndLimitOperator =
+ pushDownLimitToLeftChildSeriesScanOperator
+ || pushDownOffsetAndLimitAfterInnerJoinOperator
+ || isSingleColumn;
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ IMeasurementSchema measurementSchema = measurementSchemas.get(i);
+ List<Expression> pushDownPredicatesForCurrentMeasurement =
+
pushDownConjunctsForEachMeasurement.get(measurementSchema.getMeasurementName());
+ Expression pushDownPredicateForCurrentMeasurement =
+ isSingleColumn
+ ? node.getPushDownPredicate()
+ : (pushDownPredicatesForCurrentMeasurement == null
+ ? null
+ :
IrUtils.combineConjuncts(pushDownPredicatesForCurrentMeasurement));
+ SeriesScanOptions.Builder builder =
+ node.getTimePredicate()
+ .map(expression -> getSeriesScanOptionsBuilder(context,
expression))
+ .orElseGet(SeriesScanOptions.Builder::new);
+ builder.withAllSensors(new HashSet<>(measurementColumnNames));
+ if (pushDownPredicateForCurrentMeasurement != null) {
+ builder.withPushDownFilter(
+ convertPredicateToFilter(
+ pushDownPredicateForCurrentMeasurement,
+
Collections.singletonMap(measurementSchema.getMeasurementName(), 0),
+ commonParameter.columnSchemaMap,
+ commonParameter.timeColumnName));
+ }
+ if (isSingleColumn
+ || (pushDownLimitToLeftChildSeriesScanOperator
+ && pushDownPredicateForCurrentMeasurement != null)) {
+ builder.withPushDownLimit(node.getPushDownLimit());
+
builder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
+ }
+ if (canPushDownLimitToAllSeriesScanOptions) {
+ builder.withPushDownLimit(node.getPushDownLimit() +
node.getPushDownOffset());
+ }
+ if (isSingleColumn
+ || (pushDownLimitToLeftChildSeriesScanOperator
+ && pushDownPredicateForCurrentMeasurement != null)) {
+ builder.withPushDownOffset(
+ node.isPushLimitToEachDevice() ? 0 :
node.getPushDownOffset());
+ }
+ seriesScanOptionsList.add(builder.build());
+ }
+ }
+
+ private Operator constructTreeToTableViewAdaptorOperator(DeviceEntry
deviceEntry) {
+ seriesScanOperators = new ArrayList<>(measurementSchemas.size());
+ operator = constructAndJoinScanOperators(deviceEntry);
+ return new TreeToTableViewAdaptorOperator(
+ operatorContext,
+ deviceEntry,
+ columnsIndexArray,
+ fullColumnSchemas,
+ operator,
+ TableOperatorGenerator.createTreeDeviceIdColumnValueExtractor(
+ node.getTreeDBName()));
+ }
+
+ private Operator constructAndJoinScanOperators(DeviceEntry
deviceEntry) {
+ List<Operator> childrenWithPushDownPredicate = new ArrayList<>();
+ List<TSDataType> innerJoinDataTypeList = new ArrayList<>();
+ List<Operator> childrenWithoutPushDownPredicate = new
ArrayList<>();
+ List<TSDataType> fullOuterTimeJoinDataTypeList = new ArrayList<>();
+ Map<InputLocation, Integer> leftOuterJoinColumnIndexMap = new
HashMap<>();
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ IMeasurementSchema measurementSchema = measurementSchemas.get(i);
+ NonAlignedFullPath path =
+ new NonAlignedFullPath(deviceEntry.getDeviceID(),
measurementSchema);
+ SeriesScanOptions seriesScanOptions =
seriesScanOptionsList.get(i);
+ Operator seriesScanOperator =
+ new SeriesScanOperator(
+ operatorContext,
+ node.getPlanNodeId(),
+ path,
+ node.getScanOrder(),
+ seriesScanOptions);
+ seriesScanOperators.add(seriesScanOperator);
+ if (seriesScanOptions.getPushDownFilter() != null) {
+ childrenWithPushDownPredicate.add(seriesScanOperator);
+ innerJoinDataTypeList.add(measurementSchema.getType());
+ leftOuterJoinColumnIndexMap.put(
+ new InputLocation(0, childrenWithPushDownPredicate.size()
- 1), i);
+ } else {
+ childrenWithoutPushDownPredicate.add(seriesScanOperator);
+ fullOuterTimeJoinDataTypeList.add(measurementSchema.getType());
+ leftOuterJoinColumnIndexMap.put(
+ new InputLocation(1,
childrenWithoutPushDownPredicate.size() - 1), i);
+ }
+ }
+ Operator leftChild =
+ generateInnerTimeJoinOperator(childrenWithPushDownPredicate,
innerJoinDataTypeList);
+ Operator rightChild =
+ generateFullOuterTimeJoinOperator(
+ childrenWithoutPushDownPredicate,
fullOuterTimeJoinDataTypeList);
+ return generateLeftOuterTimeJoinOperator(
+ leftChild,
+ rightChild,
+ childrenWithPushDownPredicate.size(),
+ leftOuterJoinColumnIndexMap,
+ IMeasurementSchema.getDataTypeList(measurementSchemas));
+ }
+
+ private Operator generateInnerTimeJoinOperator(
+ List<Operator> operators, List<TSDataType> dataTypes) {
+ if (operators.isEmpty()) {
+ return null;
+ }
+ if (operators.size() == 1) {
+ return operators.get(0);
+ }
+ Map<InputLocation, Integer> outputColumnMap = new HashMap<>();
+ for (int i = 0; i < operators.size(); i++) {
+ outputColumnMap.put(new InputLocation(i, 0), i);
+ }
+ Operator currentOperator =
+ new InnerTimeJoinOperator(
+ operatorContext,
+ operators,
+ dataTypes,
+ node.getScanOrder() == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator(),
+ outputColumnMap);
+ boolean addOffsetAndLimitOperatorAfterLeftChild =
+ operators.size() > 1 && cannotPushDownConjuncts.isEmpty();
+ if (addOffsetAndLimitOperatorAfterLeftChild) {
+ if (node.getPushDownOffset() > 0) {
+ currentOperator = getReuseOffsetOperator(currentOperator);
+ }
+ if (node.getPushDownLimit() > 0) {
+ currentOperator = getReuseLimitOperator(currentOperator);
+ }
+ }
+ return currentOperator;
+ }
+
+ private Operator generateFullOuterTimeJoinOperator(
+ List<Operator> operators, List<TSDataType> dataTypes) {
+ if (operators.isEmpty()) {
+ return null;
+ }
+ if (operators.size() == 1) {
+ return operators.get(0);
+ }
+ List<ColumnMerger> columnMergers = new
ArrayList<>(operators.size());
+ for (int i = 0; i < operators.size(); i++) {
+ columnMergers.add(
+ new SingleColumnMerger(
+ new InputLocation(i, 0),
+ node.getScanOrder() == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()));
+ }
+ return new FullOuterTimeJoinOperator(
+ operatorContext,
+ operators,
+ node.getScanOrder(),
+ dataTypes,
+ columnMergers,
+ node.getScanOrder() == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator());
+ }
+
+ private Operator generateLeftOuterTimeJoinOperator(
+ Operator left,
+ Operator right,
+ int leftColumnCount,
+ Map<InputLocation, Integer> outputColumnMap,
+ List<TSDataType> dataTypes) {
+ if (left == null) {
+ return right;
+ } else if (right == null) {
+ return left;
+ } else {
+ return new TableLeftOuterTimeJoinOperator(
+ operatorContext,
+ left,
+ right,
+ leftColumnCount,
+ outputColumnMap,
+ dataTypes,
+ node.getScanOrder() == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator());
+ }
+ }
+
+ private Operator getReuseOffsetOperator(Operator child) {
+ this.reuseOffsetOperator =
+ reuseOffsetOperator == null
+ ? new OffsetOperator(operatorContext,
node.getPushDownOffset(), child)
+ : new OffsetOperator(reuseOffsetOperator, child);
+ return this.reuseOffsetOperator;
+ }
+
+ private Operator getReuseLimitOperator(Operator child) {
+ this.reuseLimitOperator =
+ reuseLimitOperator == null
+ ? new LimitOperator(operatorContext,
node.getPushDownLimit(), child)
+ : new LimitOperator(reuseLimitOperator, child);
+ return this.reuseLimitOperator;
+ }
+
+ private Operator getFilterAndProjectOperator(Operator childOperator)
{
+ startCloseInternalOperator = childOperator;
+ if (filterAndProjectOperator != null) {
+ return new FilterAndProjectOperator(filterAndProjectOperator,
childOperator);
+ }
+ List<TSDataType> inputDataTypeList = new
ArrayList<>(fullColumnSchemas.size());
+ Map<Symbol, List<InputLocation>> symbolInputLocationMap =
+ new HashMap<>(fullColumnSchemas.size());
+ for (int i = 0; i < fullColumnSchemas.size(); i++) {
+ ColumnSchema columnSchema = fullColumnSchemas.get(i);
+ symbolInputLocationMap
+ .computeIfAbsent(new Symbol(columnSchema.getName()), key ->
new ArrayList<>())
+ .add(new InputLocation(0, i));
+ inputDataTypeList.add(getTSDataType(columnSchema.getType()));
+ }
+ Expression combinedCannotPushDownPredicates =
+ cannotPushDownConjuncts.isEmpty()
+ ? null
+ : IrUtils.combineConjuncts(cannotPushDownConjuncts);
+ filterAndProjectOperator =
+ (FilterAndProjectOperator)
+
TableOperatorGenerator.this.constructFilterAndProjectOperator(
+ Optional.ofNullable(combinedCannotPushDownPredicates),
+ childOperator,
+ node.getOutputSymbols().stream()
+ .map(Symbol::toSymbolReference)
+ .toArray(Expression[]::new),
+ inputDataTypeList,
+ symbolInputLocationMap,
+ node.getPlanNodeId(),
+ context);
+ return filterAndProjectOperator;
+ }
+
+ @Override
+ public Operator getCurrentDeviceRootOperator() {
+ return operator;
+ }
+
+ @Override
+ public List<Operator> getCurrentDeviceDataSourceOperators() {
+ return seriesScanOperators;
+ }
+
+ @Override
+ public Operator getCurrentDeviceStartCloseOperator() {
+ return startCloseInternalOperator == null ? operator :
startCloseInternalOperator;
+ }
+ };
+
+ return new
DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters(
+ allSensors,
+ operatorContext,
+ node.getDeviceEntries(),
+ measurementColumnNames,
+ measurementSchemas,
+ deviceChildOperatorTreeGenerator);
+ }
+
+ private static class CommonTableScanOperatorParameters {
+
+ List<Symbol> outputColumnNames;
+ List<ColumnSchema> columnSchemas;
+ int[] columnsIndexArray;
+ Map<Symbol, ColumnSchema> columnSchemaMap;
+ Map<Symbol, Integer> idAndAttributeColumnsIndexMap;
+ List<String> measurementColumnNames;
+ Map<String, Integer> measurementColumnsIndexMap;
+ String timeColumnName;
+ List<IMeasurementSchema> measurementSchemas;
+ int measurementColumnCount;
+ int idx;
+
+ private CommonTableScanOperatorParameters(
+ DeviceTableScanNode node,
+ Map<String, String> fieldColumnsRenameMap,
+ boolean keepNonOutputMeasurementColumns) {
+ outputColumnNames = node.getOutputSymbols();
+ int outputColumnCount =
+ keepNonOutputMeasurementColumns ? node.getAssignments().size() :
outputColumnNames.size();
+ columnSchemas = new ArrayList<>(outputColumnCount);
+ columnsIndexArray = new int[outputColumnCount];
+ columnSchemaMap = node.getAssignments();
+ idAndAttributeColumnsIndexMap = node.getIdAndAttributeIndexMap();
+ measurementColumnNames = new ArrayList<>();
+ measurementColumnsIndexMap = new HashMap<>();
+ measurementSchemas = new ArrayList<>();
+ measurementColumnCount = 0;
+ idx = 0;
+
+ for (Symbol columnName : outputColumnNames) {
+ ColumnSchema schema =
+ requireNonNull(columnSchemaMap.get(columnName), columnName + " is
null");
+
+ switch (schema.getColumnCategory()) {
+ case TAG:
+ case ATTRIBUTE:
+ columnsIndexArray[idx++] =
+ requireNonNull(
+ idAndAttributeColumnsIndexMap.get(columnName), columnName
+ " is null");
+ columnSchemas.add(schema);
+ break;
+ case FIELD:
+ columnsIndexArray[idx++] = measurementColumnCount;
+ measurementColumnCount++;
+
+ String realMeasurementName =
+ fieldColumnsRenameMap.getOrDefault(schema.getName(),
schema.getName());
+
+ measurementColumnNames.add(realMeasurementName);
+ measurementSchemas.add(
+ new MeasurementSchema(realMeasurementName,
getTSDataType(schema.getType())));
+ columnSchemas.add(schema);
+ measurementColumnsIndexMap.put(columnName.getName(),
measurementColumnCount - 1);
+ break;
+ case TIME:
+ columnsIndexArray[idx++] = -1;
+ columnSchemas.add(schema);
+ timeColumnName = columnName.getName();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected column category: " + schema.getColumnCategory());
+ }
+ }
+ Set<Symbol> outputSet = new HashSet<>(outputColumnNames);
+ for (Map.Entry<Symbol, ColumnSchema> entry :
node.getAssignments().entrySet()) {
+ if (!outputSet.contains(entry.getKey()) &&
entry.getValue().getColumnCategory() == FIELD) {
+ if (keepNonOutputMeasurementColumns) {
+ columnSchemas.add(entry.getValue());
+ columnsIndexArray[idx++] = measurementColumnCount;
+ }
+ measurementColumnCount++;
+ String realMeasurementName =
+ fieldColumnsRenameMap.getOrDefault(
+ entry.getValue().getName(), entry.getValue().getName());
+
+ measurementColumnNames.add(realMeasurementName);
+ measurementSchemas.add(
+ new MeasurementSchema(
+ realMeasurementName,
getTSDataType(entry.getValue().getType())));
+ measurementColumnsIndexMap.put(entry.getKey().getName(),
measurementColumnCount - 1);
+ } else if (entry.getValue().getColumnCategory() == TIME) {
+ timeColumnName = entry.getKey().getName();
+ }
+ }
+ }
}
public static IDeviceID.TreeDeviceIdColumnValueExtractor
createTreeDeviceIdColumnValueExtractor(
@@ -453,78 +947,19 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
String className,
Map<String, String> fieldColumnsRenameMap) {
- List<Symbol> outputColumnNames = node.getOutputSymbols();
- int outputColumnCount = outputColumnNames.size();
- List<ColumnSchema> columnSchemas = new ArrayList<>(outputColumnCount);
- int[] columnsIndexArray = new int[outputColumnCount];
- Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments();
- Map<Symbol, Integer> idAndAttributeColumnsIndexMap =
node.getIdAndAttributeIndexMap();
- List<String> measurementColumnNames = new ArrayList<>();
- Map<String, Integer> measurementColumnsIndexMap = new HashMap<>();
- String timeColumnName = null;
- List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- int measurementColumnCount = 0;
- int idx = 0;
- for (Symbol columnName : outputColumnNames) {
- ColumnSchema schema =
- requireNonNull(columnSchemaMap.get(columnName), columnName + " is
null");
-
- switch (schema.getColumnCategory()) {
- case TAG:
- case ATTRIBUTE:
- columnsIndexArray[idx++] =
- requireNonNull(
- idAndAttributeColumnsIndexMap.get(columnName), columnName +
" is null");
- columnSchemas.add(schema);
- break;
- case FIELD:
- columnsIndexArray[idx++] = measurementColumnCount;
- measurementColumnCount++;
-
- String realMeasurementName =
- fieldColumnsRenameMap.getOrDefault(schema.getName(),
schema.getName());
-
- measurementColumnNames.add(realMeasurementName);
- measurementSchemas.add(
- new MeasurementSchema(realMeasurementName,
getTSDataType(schema.getType())));
- columnSchemas.add(schema);
- measurementColumnsIndexMap.put(columnName.getName(),
measurementColumnCount - 1);
- break;
- case TIME:
- columnsIndexArray[idx++] = -1;
- columnSchemas.add(schema);
- timeColumnName = columnName.getName();
- break;
- default:
- throw new IllegalArgumentException(
- "Unexpected column category: " + schema.getColumnCategory());
- }
- }
-
- Set<Symbol> outputSet = new HashSet<>(outputColumnNames);
- for (Map.Entry<Symbol, ColumnSchema> entry :
node.getAssignments().entrySet()) {
- if (!outputSet.contains(entry.getKey()) &&
entry.getValue().getColumnCategory() == FIELD) {
- measurementColumnCount++;
- String realMeasurementName =
- fieldColumnsRenameMap.getOrDefault(
- entry.getValue().getName(), entry.getValue().getName());
-
- measurementColumnNames.add(realMeasurementName);
- measurementSchemas.add(
- new MeasurementSchema(realMeasurementName,
getTSDataType(entry.getValue().getType())));
- measurementColumnsIndexMap.put(entry.getKey().getName(),
measurementColumnCount - 1);
- } else if (entry.getValue().getColumnCategory() == TIME) {
- timeColumnName = entry.getKey().getName();
- }
- }
-
+ CommonTableScanOperatorParameters commonParameter =
+ new CommonTableScanOperatorParameters(node, fieldColumnsRenameMap,
false);
+ List<IMeasurementSchema> measurementSchemas =
commonParameter.measurementSchemas;
+ List<String> measurementColumnNames =
commonParameter.measurementColumnNames;
+ List<ColumnSchema> columnSchemas = commonParameter.columnSchemas;
+ int[] columnsIndexArray = commonParameter.columnsIndexArray;
SeriesScanOptions seriesScanOptions =
buildSeriesScanOptions(
context,
- columnSchemaMap,
+ commonParameter.columnSchemaMap,
measurementColumnNames,
- measurementColumnsIndexMap,
- timeColumnName,
+ commonParameter.measurementColumnsIndexMap,
+ commonParameter.timeColumnName,
node.getTimePredicate(),
node.getPushDownLimit(),
node.getPushDownOffset(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
new file mode 100644
index 00000000000..951c2b895a1
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.AlignedFullPath;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.tsfile.read.common.block.column.FloatColumn;
+import org.apache.tsfile.read.common.block.column.IntColumn;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.getDefaultSeriesScanOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DeviceIteratorScanOperatorTest {
+
+ private static final String DEVICE_ITERATOR_SCAN_OPERATOR_TEST =
+ "root.DeviceIteratorScanOperatorTest";
+ private static final List<IMeasurementSchema> measurementSchemas = new
ArrayList<>();
+
+ private static final List<TsFileResource> seqResources = new ArrayList<>();
+ private static final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private static final double DELTA = 0.000001;
+
+ @BeforeClass
+ public static void setUp() throws MetadataException, IOException,
WriteProcessException {
+ AlignedSeriesTestUtil.setUp(
+ measurementSchemas, seqResources, unSeqResources,
DEVICE_ITERATOR_SCAN_OPERATOR_TEST);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void test1() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ DeviceIteratorScanOperator operator = null;
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ driverContext.addOperatorContext(
+ 1, planNodeId, DeviceIteratorScanOperator.class.getSimpleName());
+
+ List<DeviceEntry> deviceEntries =
+ Arrays.asList(
+ new AlignedDeviceEntry(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ DEVICE_ITERATOR_SCAN_OPERATOR_TEST + ".device0"),
+ new Binary[0]),
+ new AlignedDeviceEntry(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ DEVICE_ITERATOR_SCAN_OPERATOR_TEST + ".device0"),
+ new Binary[0]),
+ new AlignedDeviceEntry(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ DEVICE_ITERATOR_SCAN_OPERATOR_TEST + ".device0"),
+ new Binary[0]));
+
+ DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator generator =
+ new DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator() {
+
+ private Operator currentDeviceRootOperator;
+
+ @Override
+ public boolean keepOffsetAndLimitOperatorAfterDeviceIterator() {
+ return true;
+ }
+
+ @Override
+ public void generateCurrentDeviceOperatorTree(DeviceEntry
deviceEntry) {
+ AlignedFullPath alignedPath =
+ new AlignedFullPath(
+ deviceEntry.getDeviceID(),
+ measurementSchemas.stream()
+ .map(IMeasurementSchema::getMeasurementName)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(m -> (IMeasurementSchema) m)
+ .collect(Collectors.toList()));
+ currentDeviceRootOperator =
+ new AlignedSeriesScanOperator(
+ driverContext.getOperatorContexts().get(0),
+ planNodeId,
+ alignedPath,
+ Ordering.ASC,
+ getDefaultSeriesScanOptions(alignedPath),
+ false,
+ null,
+ -1);
+ }
+
+ @Override
+ public Operator getCurrentDeviceRootOperator() {
+ return currentDeviceRootOperator;
+ }
+
+ @Override
+ public List<Operator> getCurrentDeviceDataSourceOperators() {
+ return Collections.singletonList(currentDeviceRootOperator);
+ }
+
+ @Override
+ public Operator getCurrentDeviceStartCloseOperator() {
+ return currentDeviceRootOperator;
+ }
+ };
+ operator =
+ new DeviceIteratorScanOperator(
+ driverContext.getOperatorContexts().get(0), deviceEntries,
generator);
+ operator.initQueryDataSource(new QueryDataSource(seqResources,
unSeqResources));
+
+ int count = 0;
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ if (count % 500 == 0) {
+ count = 0;
+ }
+ TsBlock tsBlock = operator.next();
+ if (tsBlock == null) {
+ continue;
+ }
+ assertEquals(measurementSchemas.size(), tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+
+ for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+ assertEquals(count, tsBlock.getTimeByIndex(i));
+ int delta = 0;
+ if ((long) count < 200) {
+ delta = 20000;
+ } else if ((long) count < 260
+ || ((long) count >= 300 && (long) count < 380)
+ || (long) count >= 400) {
+ delta = 10000;
+ }
+ assertEquals((delta + (long) count) % 2 == 0,
tsBlock.getColumn(0).getBoolean(i));
+ assertEquals(delta + (long) count, tsBlock.getColumn(1).getInt(i));
+ assertEquals(delta + (long) count, tsBlock.getColumn(2).getLong(i));
+ assertEquals(delta + (long) count, tsBlock.getColumn(3).getFloat(i),
DELTA);
+ assertEquals(delta + (long) count,
tsBlock.getColumn(4).getDouble(i), DELTA);
+ assertEquals(
+ String.valueOf(delta + (long) count),
tsBlock.getColumn(5).getBinary(i).toString());
+ }
+ }
+ assertEquals(500, count);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ if (operator != null) {
+ try {
+ operator.close();
+ } catch (Exception ignored) {
+ }
+ }
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/NonAlignedTreeDeviceViewScanOperatorTreeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/NonAlignedTreeDeviceViewScanOperatorTreeTest.java
new file mode 100644
index 00000000000..69f7305e39d
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/NonAlignedTreeDeviceViewScanOperatorTreeTest.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
+import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanContext;
+import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestMatadata;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedAlignedDeviceEntry;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeNonAlignedDeviceViewScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.read.common.type.TypeEnum;
+import org.apache.tsfile.read.common.type.TypeFactory;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.Operator.NOT_BLOCKED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class NonAlignedTreeDeviceViewScanOperatorTreeTest {
+
+ private static final String
NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST =
+ "root.NonAlignedTreeDeviceViewScanOperatorTreeTest";
+ private final TableOperatorGenerator tableOperatorGenerator =
+ new TableOperatorGenerator(new TestMatadata());
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<IMeasurementSchema> measurementSchemas = new
ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private final Map<Symbol, ColumnSchema> columnSchemaMap = new HashMap<>();
+ private TypeProvider typeProvider;
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas,
+ deviceIds,
+ seqResources,
+ unSeqResources,
+ NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST);
+
+ columnSchemaMap.put(
+ new Symbol("tag1"),
+ new ColumnSchema(
+ "tag1", TypeFactory.getType(TSDataType.TEXT), false,
TsTableColumnCategory.TAG));
+ columnSchemaMap.put(
+ new Symbol("time"),
+ new ColumnSchema(
+ "time", TypeFactory.getType(TSDataType.INT64), false,
TsTableColumnCategory.TIME));
+ columnSchemaMap.put(
+ new Symbol("sensor0"),
+ new ColumnSchema(
+ "sensor0", TypeFactory.getType(TSDataType.INT32), false,
TsTableColumnCategory.FIELD));
+ columnSchemaMap.put(
+ new Symbol("sensor1"),
+ new ColumnSchema(
+ "sensor1", TypeFactory.getType(TSDataType.INT32), false,
TsTableColumnCategory.FIELD));
+ columnSchemaMap.put(
+ new Symbol("sensor2"),
+ new ColumnSchema(
+ "sensor2", TypeFactory.getType(TSDataType.INT32), false,
TsTableColumnCategory.FIELD));
+ columnSchemaMap.put(
+ new Symbol("sensor3"),
+ new ColumnSchema(
+ "sensor3", TypeFactory.getType(TSDataType.INT32), false,
TsTableColumnCategory.FIELD));
+
+ Map<Symbol, Type> symbolTSDataTypeMap = new HashMap<>();
+ symbolTSDataTypeMap.put(new Symbol("sensor0"),
TypeFactory.getType(TSDataType.INT32));
+ symbolTSDataTypeMap.put(new Symbol("sensor1"),
TypeFactory.getType(TSDataType.INT32));
+ symbolTSDataTypeMap.put(new Symbol("sensor2"),
TypeFactory.getType(TSDataType.INT32));
+ symbolTSDataTypeMap.put(new Symbol("sensor3"),
TypeFactory.getType(TSDataType.INT32));
+ symbolTSDataTypeMap.put(new Symbol("time"),
TypeFactory.getType(TypeEnum.INT64));
+ symbolTSDataTypeMap.put(new Symbol("tag1"),
TypeFactory.getType(TSDataType.TEXT));
+ typeProvider = new TypeProvider(symbolTSDataTypeMap);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void testScanWithPushDownPredicateAndLimitAndOffset() throws
Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownOffset(500);
+ node.setPushDownLimit(500);
+ node.setPushDownPredicate(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor1").toSymbolReference(),
+ new LongLiteral("1000")));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 500);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithPushDownPredicateAndPushLimitToEachDevice() throws
Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushLimitToEachDevice(true);
+ node.setPushDownLimit(500);
+ node.setPushDownPredicate(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor1").toSymbolReference(),
+ new LongLiteral("1000")));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 1320);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void
testScanWithCanPushDownPredicateAndCannotPushDownPredicateAndPushLimitToEachDevice()
+ throws Exception {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
+ getTreeNonAlignedDeviceViewScanNode(
+ outputColumnList,
+ Arrays.asList("sensor0", "sensor1", "sensor2", "time", "tag1",
"sensor3"));
+ node.setPushDownLimit(100);
+ node.setPushLimitToEachDevice(true);
+ node.setPushDownPredicate(
+ new LogicalExpression(
+ LogicalExpression.Operator.AND,
+ Arrays.asList(
+ new LogicalExpression(
+ LogicalExpression.Operator.OR,
+ Arrays.asList(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor0").toSymbolReference(),
+ new LongLiteral("1000")),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor1").toSymbolReference(),
+ new LongLiteral("1000")))),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor2").toSymbolReference(),
+ new LongLiteral("1000")),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor3").toSymbolReference(),
+ new LongLiteral("1000")))));
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 300);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithPushDownPredicateAndPushLimitToEachDevice1() throws
Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
+ getTreeNonAlignedDeviceViewScanNode(
+ outputColumnList,
+ Arrays.asList("sensor0", "sensor1", "sensor2", "time", "tag1",
"sensor3"));
+ node.setPushDownPredicate(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor3").toSymbolReference(),
+ new LongLiteral("1000")));
+ node.setPushLimitToEachDevice(true);
+ node.setPushDownLimit(10);
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 30);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithCannotPushDownPredicateAndLimitAndOffset2() throws
Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownOffset(500);
+ node.setPushDownLimit(500);
+ node.setPushDownPredicate(
+ new LogicalExpression(
+ LogicalExpression.Operator.OR,
+ Arrays.asList(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor1").toSymbolReference(),
+ new LongLiteral("1000")),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor2").toSymbolReference(),
+ new LongLiteral("1000")))));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof LimitOperator);
+ try {
+ checkResult(operator, outputColumnList, 500);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithLimit() throws Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownLimit(500);
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof LimitOperator);
+ try {
+ checkResult(operator, outputColumnList, 500);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithPushLimitToEachDevice() throws Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownLimit(500);
+ node.setPushLimitToEachDevice(true);
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 1500);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithOneFieldColumn() throws Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownLimit(500);
+ node.setPushDownPredicate(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor0").toSymbolReference(),
+ new LongLiteral("1000")));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 500);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithOneFieldColumnAndPushLimitToEachDevice() throws
Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownLimit(500);
+ node.setPushLimitToEachDevice(true);
+ node.setPushDownPredicate(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor0").toSymbolReference(),
+ new LongLiteral("1000")));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 1320);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithOffset() throws Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownOffset(1200);
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof OffsetOperator);
+ try {
+ checkResult(operator, outputColumnList, 300);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithPushDownPredicateAndOffset1() throws Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownOffset(1200);
+ node.setPushDownPredicate(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor1").toSymbolReference(),
+ new LongLiteral("0")));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 300);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithPushDownPredicateAndOffset2() throws Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+ node.setPushDownOffset(1200);
+ node.setPushDownPredicate(
+ new LogicalExpression(
+ LogicalExpression.Operator.AND,
+ Arrays.asList(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor1").toSymbolReference(),
+ new LongLiteral("0")),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor2").toSymbolReference(),
+ new LongLiteral("0")))));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ assertTrue(operator instanceof DeviceIteratorScanOperator);
+ try {
+ checkResult(operator, outputColumnList, 300);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testScanWithPushDownPredicate() throws Exception {
+ List<String> outputColumnList = Arrays.asList("sensor0", "sensor1",
"sensor2", "time", "tag1");
+ TreeNonAlignedDeviceViewScanNode node =
+ getTreeNonAlignedDeviceViewScanNode(
+ outputColumnList,
+ Arrays.asList("sensor0", "sensor1", "sensor2", "time", "tag1",
"sensor3"));
+ node.setPushDownPredicate(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor3").toSymbolReference(),
+ new LongLiteral("1000")));
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = getOperator(node, instanceNotificationExecutor);
+ try {
+ checkResult(operator, outputColumnList, 1320);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ operator.close();
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void testUtils() {
+ Expression expression =
+ new LogicalExpression(
+ LogicalExpression.Operator.AND,
+ Arrays.asList(
+ new LogicalExpression(
+ LogicalExpression.Operator.OR,
+ Arrays.asList(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor0").toSymbolReference(),
+ new LongLiteral("1000")),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor1").toSymbolReference(),
+ new LongLiteral("1000")))),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor2").toSymbolReference(),
+ new LongLiteral("1000")),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.GREATER_THAN,
+ new Symbol("sensor3").toSymbolReference(),
+ new LongLiteral("1000"))));
+ List<Expression> conjuncts = IrUtils.extractConjuncts(expression);
+ assertEquals(3, conjuncts.size());
+ Set<Symbol> symbols = SymbolsExtractor.extractUnique(expression);
+ assertEquals(4, symbols.size());
+ assertTrue(symbols.contains(new Symbol("sensor0")));
+ assertTrue(symbols.contains(new Symbol("sensor1")));
+ assertTrue(symbols.contains(new Symbol("sensor2")));
+ assertTrue(symbols.contains(new Symbol("sensor3")));
+ }
+
+ private Operator getOperator(
+ TreeNonAlignedDeviceViewScanNode node, ExecutorService
instanceNotificationExecutor) {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DataDriverContext driverContext = new
DataDriverContext(fragmentInstanceContext, 0);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ driverContext.addOperatorContext(
+ 1, planNodeId, DeviceIteratorScanOperator.class.getSimpleName());
+
+ LocalExecutionPlanContext localExecutionPlanContext =
+ new LocalExecutionPlanContext(
+ typeProvider, fragmentInstanceContext, new
DataNodeQueryContext(1));
+ Operator operator =
+ tableOperatorGenerator.visitTreeNonAlignedDeviceViewScan(node,
localExecutionPlanContext);
+ ((DataDriverContext) localExecutionPlanContext.getDriverContext())
+ .getSourceOperators()
+ .forEach(
+ sourceOperator ->
+ sourceOperator.initQueryDataSource(
+ new QueryDataSource(seqResources, unSeqResources)));
+ return operator;
+ }
+
+ private void checkResult(Operator operator, List<String> outputColumnList,
int expectedCount)
+ throws Exception {
+ int count = 0;
+ while (operator.isBlocked().get() != NOT_BLOCKED && operator.hasNext()) {
+ TsBlock tsBlock = operator.next();
+ if (tsBlock == null) {
+ continue;
+ }
+ assertEquals(outputColumnList.size(), tsBlock.getValueColumnCount());
+ for (int i = 0; i < outputColumnList.size(); i++) {
+ Symbol symbol = new Symbol(outputColumnList.get(i));
+ assertEquals(
+ columnSchemaMap.get(symbol).getType(),
+ TypeFactory.getType(tsBlock.getColumn(i).getDataType()));
+ }
+ count += tsBlock.getPositionCount();
+ }
+ assertEquals(expectedCount, count);
+ }
+
+ private TreeNonAlignedDeviceViewScanNode getTreeNonAlignedDeviceViewScanNode(
+ List<String> outputColumns) {
+ return getTreeNonAlignedDeviceViewScanNode(outputColumns, outputColumns);
+ }
+
+ private TreeNonAlignedDeviceViewScanNode getTreeNonAlignedDeviceViewScanNode(
+ List<String> outputColumns, List<String> assignmentColumns) {
+ List<Symbol> outputSymbols =
+ outputColumns.stream().map(Symbol::new).collect(Collectors.toList());
+
+ Map<Symbol, ColumnSchema> assignments = new HashMap<>();
+ for (String assignmentColumn : assignmentColumns) {
+ Symbol symbol = new Symbol(assignmentColumn);
+ assignments.put(symbol, columnSchemaMap.get(symbol));
+ }
+
+ Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>();
+ idAndAttributeIndexMap.put(new Symbol("tag1"), 0);
+
+ List<DeviceEntry> deviceEntries =
+ Arrays.asList(
+ new NonAlignedAlignedDeviceEntry(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST +
".device0"),
+ new Binary[0]),
+ new NonAlignedAlignedDeviceEntry(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST +
".device1"),
+ new Binary[0]),
+ new NonAlignedAlignedDeviceEntry(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST +
".device1"),
+ new Binary[0]));
+ Expression timePredicate = null;
+ Expression pushDownPredicate = null;
+ long pushDownLimit = 0;
+ long pushDownOffset = 0;
+ boolean pushLimitToEachDevice = false;
+ boolean containsNonAlignedDevice = true;
+ String treeDBName = NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST;
+
+ Map<String, String> measurementColumnNameMap = new HashMap<>();
+ return new TreeNonAlignedDeviceViewScanNode(
+ new PlanNodeId("1"),
+ new QualifiedObjectName(
+
NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST.toLowerCase(), "table1"),
+ outputSymbols,
+ assignments,
+ deviceEntries,
+ idAndAttributeIndexMap,
+ Ordering.ASC,
+ timePredicate,
+ pushDownPredicate,
+ pushDownLimit,
+ pushDownOffset,
+ pushLimitToEachDevice,
+ containsNonAlignedDevice,
+ treeDBName,
+ measurementColumnNameMap);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeToTableViewAdaptorOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeToTableViewAdaptorOperatorTest.java
new file mode 100644
index 00000000000..8ef662e9db9
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeToTableViewAdaptorOperatorTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.AlignedFullPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import io.airlift.units.Duration;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.tsfile.read.common.block.column.FloatColumn;
+import org.apache.tsfile.read.common.block.column.IntColumn;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.read.common.type.TypeFactory;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.getDefaultSeriesScanOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TreeToTableViewAdaptorOperatorTest {
+ private static final String TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST =
+ "root.TreeToTableViewAdaptorOperatorTest";
+ private static final List<IMeasurementSchema> measurementSchemas = new
ArrayList<>();
+
+ private static final List<TsFileResource> seqResources = new ArrayList<>();
+ private static final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private static final double DELTA = 0.000001;
+
+ @BeforeClass
+ public static void setUp() throws MetadataException, IOException,
WriteProcessException {
+ AlignedSeriesTestUtil.setUp(
+ measurementSchemas, seqResources, unSeqResources,
TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST);
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void test1() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ Operator operator = null;
+ try {
+ AlignedFullPath alignedPath =
+ new AlignedFullPath(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(
+ TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST + ".device0"),
+ measurementSchemas.stream()
+ .map(IMeasurementSchema::getMeasurementName)
+ .collect(Collectors.toList()),
+ measurementSchemas.stream()
+ .map(m -> (IMeasurementSchema) m)
+ .collect(Collectors.toList()));
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ driverContext.addOperatorContext(
+ 1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+ AlignedSeriesScanOperator seriesScanOperator =
+ new AlignedSeriesScanOperator(
+ driverContext.getOperatorContexts().get(0),
+ planNodeId,
+ alignedPath,
+ Ordering.ASC,
+ getDefaultSeriesScanOptions(alignedPath),
+ false,
+ null,
+ -1);
+ seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources,
unSeqResources));
+ seriesScanOperator
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ List<ColumnSchema> columnSchemas = new
ArrayList<>(measurementSchemas.size() + 3);
+ int[] columnIndexArray = new int[measurementSchemas.size() + 3];
+
+ for (int i = 0; i < measurementSchemas.size(); i++) {
+ IMeasurementSchema measurementSchema = measurementSchemas.get(i);
+ columnSchemas.add(
+ new ColumnSchema(
+ measurementSchema.getMeasurementName(),
+ TypeFactory.getType(measurementSchema.getType()),
+ false,
+ TsTableColumnCategory.FIELD));
+ columnIndexArray[i] = i;
+ }
+ columnSchemas.add(
+ new ColumnSchema(
+ "tag", TypeFactory.getType(TSDataType.STRING), false,
TsTableColumnCategory.TAG));
+ columnIndexArray[measurementSchemas.size()] = 0;
+ columnSchemas.add(
+ new ColumnSchema(
+ "attr",
+ TypeFactory.getType(TSDataType.STRING),
+ false,
+ TsTableColumnCategory.ATTRIBUTE));
+ columnIndexArray[measurementSchemas.size() + 1] = 0;
+ columnSchemas.add(
+ new ColumnSchema(
+ "time",
+ TypeFactory.getType(TSDataType.TIMESTAMP),
+ false,
+ TsTableColumnCategory.TIME));
+ columnIndexArray[measurementSchemas.size() + 2] = -1;
+
+ operator =
+ new TreeToTableViewAdaptorOperator(
+ driverContext.addOperatorContext(
+ 2, planNodeId,
TreeToTableViewAdaptorOperator.class.getSimpleName()),
+ new AlignedDeviceEntry(
+ alignedPath.getDeviceId(),
+ new Binary[] {new Binary("attr1",
TSFileConfig.STRING_CHARSET)}),
+ columnIndexArray,
+ columnSchemas,
+ seriesScanOperator,
+ TableOperatorGenerator.createTreeDeviceIdColumnValueExtractor(
+ TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST));
+ int count = 0;
+ while (operator.hasNext()) {
+ TsBlock tsBlock = operator.next();
+ if (tsBlock == null) {
+ continue;
+ }
+
+ assertEquals(columnSchemas.size(), tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+ assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+ assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+ assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+ assertTrue(tsBlock.getColumn(6) instanceof RunLengthEncodedColumn);
+ assertTrue(tsBlock.getColumn(7) instanceof RunLengthEncodedColumn);
+ assertTrue(tsBlock.getColumn(8) instanceof TimeColumn);
+
+ for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+ assertEquals(count, tsBlock.getColumn(8).getLong(i));
+ int delta = 0;
+ if ((long) count < 200) {
+ delta = 20000;
+ } else if ((long) count < 260
+ || ((long) count >= 300 && (long) count < 380)
+ || (long) count >= 400) {
+ delta = 10000;
+ }
+ assertEquals((delta + (long) count) % 2 == 0,
tsBlock.getColumn(0).getBoolean(i));
+ assertEquals(delta + (long) count, tsBlock.getColumn(1).getInt(i));
+ assertEquals(delta + (long) count, tsBlock.getColumn(2).getLong(i));
+ assertEquals(delta + (long) count, tsBlock.getColumn(3).getFloat(i),
DELTA);
+ assertEquals(delta + (long) count,
tsBlock.getColumn(4).getDouble(i), DELTA);
+ assertEquals(
+ String.valueOf(delta + (long) count),
tsBlock.getColumn(5).getBinary(i).toString());
+ assertEquals("device0",
tsBlock.getColumn(6).getBinary(i).toString());
+ assertEquals("attr1", tsBlock.getColumn(7).getBinary(i).toString());
+ }
+ }
+ Assert.assertEquals(500, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ if (operator != null) {
+ try {
+ operator.close();
+ } catch (Exception ignored) {
+ }
+ }
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}