This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/last_cache by this push:
new 93b0bc70f1b fix
93b0bc70f1b is described below
commit 93b0bc70f1bfd391c8c1447198b143635b89436e
Author: Beyyes <[email protected]>
AuthorDate: Sun Dec 22 17:03:13 2024 +0800
fix
---
.../TableAggregationTableScanOperator.java | 2 +
.../source/relational/TableLastQueryOperator.java | 212 ++++++++++++---------
.../planner/plan/parameter/SeriesScanOptions.java | 12 ++
3 files changed, 134 insertions(+), 92 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index 6cb0e656ac0..bfeba58b265 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -93,7 +93,9 @@ public class TableAggregationTableScanOperator extends
AbstractDataSourceOperato
protected List<TSDataType> measurementColumnTSDataTypes;
protected int measurementCount;
+ // distinct column schemas appeared in aggregation function
protected List<ColumnSchema> aggColumnSchemas;
+ // length of aggColumnsIndexArray equals the size of aggColumnSchemas
protected int[] aggColumnsIndexArray;
protected SeriesScanOptions seriesScanOptions;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
index 30aa2933b0f..7e5e6abdd32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
@@ -19,11 +19,9 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
-import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByAccumulator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
@@ -35,14 +33,12 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
-import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
@@ -66,9 +62,9 @@ import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
-import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.Utils.serializeTimeValue;
import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR;
import static
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
@@ -132,7 +128,7 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
context,
null,
null,
- Collections.emptyList(),
+ new ArrayList<>(1),
seriesScanOptions,
Collections.emptyList(),
null,
@@ -230,64 +226,69 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
buildAggTableScanArguments();
}
- if (!aggTableScanOperator.hasNext()) {
- for (int i = 0; i < tableAggregators.size() - 1; i++) {
- resultTsBlockBuilder
- .getValueColumnBuilders()[groupKeySize +
unCachedMeasurementToAggregatorIndex.get(i)]
- .appendNull();
- }
- outputDeviceIndex++;
- fetchLastCacheForCurrentDevice = false;
- appendToResultTsBlockBuilder();
- return;
- }
-
- TsBlock updateBlock = aggTableScanOperator.next();
- if (updateBlock != null && !updateBlock.isEmpty()) {
+ // if (!aggTableScanOperator.hasNext()) {
+ // for (int i = 0; i < tableAggregators.size() - 1; i++) {
+ // resultTsBlockBuilder
+ // .getValueColumnBuilders()[groupKeySize +
+ // unCachedMeasurementToAggregatorIndex.get(i)]
+ // .appendNull();
+ // }
+ // outputDeviceIndex++;
+ // fetchLastCacheForCurrentDevice = false;
+ // appendToResultTsBlockBuilder();
+ // return;
+ // }
+
+ // TsBlock updateBlock = aggTableScanOperator.next();
+ if (calculateAggregationResultForCurrentTimeRange()) {
int channel = 0;
List<String> updateMeasurementList = new ArrayList<>();
List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
- for (int i = 0; i < tableAggregators.size() - 1; i++) {
- if (updateBlock.getColumn(i).isNull(0)) {
- resultTsBlockBuilder
- .getValueColumnBuilders()[
- groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)]
- .appendNull();
- } else {
- resultTsBlockBuilder
- .getValueColumnBuilders()[
- groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)]
- .writeBinary(updateBlock.getColumn(i).getBinary(0));
- }
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ ColumnBuilder valueColumnBuilder =
+ resultTsBlockBuilder
+ .getValueColumnBuilders()[
+ groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)];
+ // if (updateBlock.getColumn(i).isNull(0)) {
+ // valueColumnBuilder.appendNull();
+ // } else {
+ // valueColumnBuilder
+ //
.writeBinary(updateBlock.getColumn(i).getBinary(0));
+ tableAggregators.get(i).evaluate(valueColumnBuilder);
+ // }
boolean isLastBy =
tableAggregators.get(i).getAccumulator() instanceof
LastByDescAccumulator;
- if (!isLastBy || !updateBlock.getColumn(i).isNull(0)) {
- ColumnSchema schema =
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
- if (schema.getColumnCategory() == TsTableColumnCategory.TIME
- || schema.getColumnCategory() ==
TsTableColumnCategory.MEASUREMENT) {
- updateMeasurementList.add(
- schema.getColumnCategory() == TsTableColumnCategory.TIME ?
"" : schema.getName());
- TimeValuePair tv =
- new TimeValuePair(
- updateBlock.getColumn(tableAggregators.size() -
1).getLong(0),
- updateBlock.getColumn(i).getTsPrimitiveType(0));
- updateTimeValuePairList.add(tv);
- }
- }
+ // if (!isLastBy || !updateBlock.getColumn(i).isNull(0)) {
+ // ColumnSchema schema =
+ // aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+ // if (schema.getColumnCategory() ==
TsTableColumnCategory.TIME
+ // || schema.getColumnCategory() ==
TsTableColumnCategory.MEASUREMENT) {
+ // updateMeasurementList.add(
+ // schema.getColumnCategory() ==
TsTableColumnCategory.TIME ? "" :
+ // schema.getName());
+ // TimeValuePair tv =
+ // new TimeValuePair(
+ //
+ // (LastDescAccumulator)(tableAggregators.get(i).getAccumulator()).
+ //
updateBlock.getColumn(tableAggregators.size() - 1).getLong(0),
+ //
updateBlock.getColumn(i).getTsPrimitiveType(0));
+ // updateTimeValuePairList.add(tv);
+ // }
+ // }
}
String[] updateMeasurementArray = updateMeasurementList.toArray(new
String[0]);
TimeValuePair[] updateTimeValuePairArray =
updateTimeValuePairList.toArray(new TimeValuePair[0]);
- TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(
- dbName, currentDeviceEntry.getDeviceID(), updateMeasurementArray,
false);
- TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
- dbName,
- currentDeviceEntry.getDeviceID(),
- updateMeasurementArray,
- updateTimeValuePairArray);
+ // TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(
+ // dbName, currentDeviceEntry.getDeviceID(),
updateMeasurementArray, false);
+ // TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+ // dbName,
+ // currentDeviceEntry.getDeviceID(),
+ // updateMeasurementArray,
+ // updateTimeValuePairArray);
outputDeviceIndex++;
fetchLastCacheForCurrentDevice = false;
@@ -328,10 +329,10 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
}
return;
}
-
- unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
}
+ unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
+
TableAggregator newAggregator =
new TableAggregator(
aggregator.getAccumulator(),
@@ -343,14 +344,15 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
// last_by always has three channels
for (int i = 0; i < 3; i++) {
- schema =
initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum + i));
+ int aggIdx = initAggregatorInputChannels.get(channelNum + i);
+ schema = initAggColumnSchemas.get(aggIdx);
columnName = schema.getName();
if (!aggColumnLayout.containsKey(columnName)) {
switch (schema.getColumnCategory()) {
case ID:
case ATTRIBUTE:
- newAggColumnsIndexArray.add(initAggColumnsIndexArray[channelNum]);
+ newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
break;
case MEASUREMENT:
newAggColumnsIndexArray.add(measurementCount);
@@ -409,10 +411,10 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
}
return;
}
-
- unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
}
+ unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
+
TableAggregator newAggregator =
new TableAggregator(
aggregator.getAccumulator(),
@@ -424,14 +426,15 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
// last_by always has two channels
for (int i = 0; i < 2; i++) {
- schema =
initAggColumnSchemas.get(initAggregatorInputChannels.get(channelNum + i));
+ int aggIdx = initAggregatorInputChannels.get(channelNum + i);
+ schema = initAggColumnSchemas.get(aggIdx);
columnName = schema.getName();
if (!aggColumnLayout.containsKey(columnName)) {
switch (schema.getColumnCategory()) {
case ID:
case ATTRIBUTE:
- newAggColumnsIndexArray.add(initAggColumnsIndexArray[channelNum]);
+ newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
break;
case MEASUREMENT:
newAggColumnsIndexArray.add(measurementCount);
@@ -464,12 +467,28 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
}
private void buildAggTableScanArguments() {
- addLastTimeAggregationToAggregators();
+ // addLastTimeAggregationToAggregators();
aggColumnsIndexArray =
newAggColumnsIndexArray.stream().mapToInt(Integer::intValue).toArray();
allSensors = new HashSet<>(measurementColumnNames);
allSensors.add("");
+ // seriesScanOptions =
+ // new SeriesScanOptions(
+ // seriesScanOptions.getGlobalTimeFilter(),
+ // seriesScanOptions.getPushDownFilter(),
+ // seriesScanOptions.getPushDownLimit(),
+ // seriesScanOptions.getPushDownOffset(),
+ // allSensors,
+ // seriesScanOptions.getPushLimitToEachDevice());
+ deviceEntries.clear();
+ deviceEntries.add(currentDeviceEntry);
+
+ this.measurementColumnTSDataTypes =
+
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
+ constructAlignedSeriesScanUtil();
+ this.seriesScanUtil.initQueryDataSource(queryDataSource);
+
aggTableScanOperator =
new TableAggregationTableScanOperator(
sourceId,
@@ -489,7 +508,7 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
canUseStatistics,
aggregatorInputChannels);
- aggTableScanOperator.initQueryDataSource(this.queryDataSource);
+ // aggTableScanOperator.initQueryDataSource(this.queryDataSource);
}
private void addLastTimeAggregationToAggregators() {
@@ -498,15 +517,22 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
tableAggregators.add(
new TableAggregator(
new LastDescAccumulator(TSDataType.TIMESTAMP),
- AggregationNode.Step.FINAL,
+ AggregationNode.Step.SINGLE,
TSDataType.TIMESTAMP,
- Collections.singletonList(timeColumnIdx),
+ Arrays.asList(timeColumnIdx, timeColumnIdx),
OptionalInt.empty()));
aggregatorInputChannels.add(timeColumnIdx);
aggregatorInputChannels.add(timeColumnIdx);
}
+ @Override
+ protected void updateResultTsBlock() {
+ // appendAggregationResult(resultTsBlockBuilder, tableAggregators);
+ // after appendAggregationResult invoked, aggregators must be cleared
+ // resetTableAggregators();
+ }
+
private void appendToResultTsBlockBuilder() {
ColumnBuilder[] columnBuilders =
resultTsBlockBuilder.getValueColumnBuilders();
@@ -565,34 +591,36 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
resultTsBlockBuilder.reset();
}
- @Override
- protected void constructAlignedSeriesScanUtil() {
- // TODO?
- // if (this.deviceEntries == null || this.deviceEntries.isEmpty()) {
- // return;
- // }
-
- DeviceEntry deviceEntry;
-
- if (this.deviceEntries.isEmpty() ||
this.deviceEntries.get(this.outputDeviceIndex) == null) {
- // for device which is not exist
- deviceEntry = new DeviceEntry(new StringArrayDeviceID(""),
Collections.emptyList());
- } else {
- deviceEntry = this.deviceEntries.get(this.outputDeviceIndex);
- }
-
- AlignedFullPath alignedPath =
- constructAlignedPath(deviceEntry, measurementColumnNames,
measurementSchemas, allSensors);
-
- this.seriesScanUtil =
- new AlignedSeriesScanUtil(
- alignedPath,
- Ordering.DESC,
- seriesScanOptions,
- operatorContext.getInstanceContext(),
- true,
- measurementColumnTSDataTypes);
- }
+ // @Override
+ // protected void constructAlignedSeriesScanUtil() {
+ // // TODO?
+ // // if (this.deviceEntries == null || this.deviceEntries.isEmpty()) {
+ // // return;
+ // // }
+ //
+ // DeviceEntry deviceEntry;
+ //
+ // if (this.deviceEntries.isEmpty() ||
this.deviceEntries.get(this.outputDeviceIndex) == null)
+ // {
+ // // for device which is not exist
+ // deviceEntry = new DeviceEntry(new StringArrayDeviceID(""),
Collections.emptyList());
+ // } else {
+ // deviceEntry = this.deviceEntries.get(this.outputDeviceIndex);
+ // }
+ //
+ // AlignedFullPath alignedPath =
+ // constructAlignedPath(deviceEntry, measurementColumnNames,
measurementSchemas,
+ // allSensors);
+ //
+ // this.seriesScanUtil =
+ // new AlignedSeriesScanUtil(
+ // alignedPath,
+ // Ordering.DESC,
+ // seriesScanOptions,
+ // operatorContext.getInstanceContext(),
+ // true,
+ // measurementColumnTSDataTypes);
+ // }
@Override
public List<TSDataType> getResultDataTypes() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index 25853c9c547..79204bbee19 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -85,6 +85,18 @@ public class SeriesScanOptions {
return pushDownFilter;
}
+ public long getPushDownLimit() {
+ return this.pushDownLimit;
+ }
+
+ public long getPushDownOffset() {
+ return this.pushDownOffset;
+ }
+
+ public boolean getPushLimitToEachDevice() {
+ return this.pushLimitToEachDevice;
+ }
+
public Set<String> getAllSensors() {
return allSensors;
}