This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/last_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/last_cache by this push:
new 152396c9c3d fix groupKeySize +
unCachedMeasurementToAggregatorIndex.get(i) in appendAggregationResult
152396c9c3d is described below
commit 152396c9c3d4b85a69e8de45fab8b268bb598ab6
Author: Beyyes <[email protected]>
AuthorDate: Mon Dec 23 15:57:48 2024 +0800
fix groupKeySize + unCachedMeasurementToAggregatorIndex.get(i) in
appendAggregationResult
---
.../TableAggregationTableScanOperator.java | 7 +-
.../source/relational/TableLastQueryOperator.java | 365 ++++++++++-----------
.../relational/aggregation/LastAccumulator.java | 12 +
.../relational/aggregation/LastByAccumulator.java | 20 +-
.../aggregation/LastByDescAccumulator.java | 5 +
.../aggregation/LastDescAccumulator.java | 5 +
.../plan/planner/TableOperatorGenerator.java | 7 +
7 files changed, 233 insertions(+), 188 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index bfeba58b265..1696db75e74 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -86,7 +86,7 @@ public class TableAggregationTableScanOperator extends
AbstractDataSourceOperato
protected final List<DeviceEntry> deviceEntries;
protected final int deviceCount;
- private int currentDeviceIndex;
+ protected int currentDeviceIndex;
protected List<String> measurementColumnNames;
protected Set<String> allSensors;
protected List<IMeasurementSchema> measurementSchemas;
@@ -121,6 +121,7 @@ public class TableAggregationTableScanOperator extends
AbstractDataSourceOperato
List<ColumnSchema> aggColumnSchemas,
int[] aggColumnsIndexArray,
List<DeviceEntry> deviceEntries,
+ int deviceCount,
SeriesScanOptions seriesScanOptions,
List<String> measurementColumnNames,
Set<String> allSensors,
@@ -142,7 +143,7 @@ public class TableAggregationTableScanOperator extends
AbstractDataSourceOperato
this.aggColumnSchemas = aggColumnSchemas;
this.aggColumnsIndexArray = aggColumnsIndexArray;
this.deviceEntries = deviceEntries;
- this.deviceCount = deviceEntries.size();
+ this.deviceCount = deviceCount;
this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER,
Integer.toString(this.deviceCount));
this.ascending = ascending;
this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
@@ -822,7 +823,7 @@ public class TableAggregationTableScanOperator extends
AbstractDataSourceOperato
CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
}
- private void resetTableAggregators() {
+ protected void resetTableAggregators() {
tableAggregators.forEach(TableAggregator::reset);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
index 7e5e6abdd32..b9ef991ecb9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableLastQueryOperator.java
@@ -47,6 +47,7 @@ import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -79,9 +80,9 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
private boolean finished = false;
private boolean fetchLastCacheForCurrentDevice;
+ private boolean hasBuildAggTableScanArguments;
private int outputDeviceIndex;
private DeviceEntry currentDeviceEntry;
- private TableAggregationTableScanOperator aggTableScanOperator;
private Map<String, Integer> aggColumnLayout;
private int newChannelCnt = 0;
private List<Integer> newAggColumnsIndexArray;
@@ -129,6 +130,7 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
null,
null,
new ArrayList<>(1),
+ 1,
seriesScanOptions,
Collections.emptyList(),
null,
@@ -201,99 +203,97 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
}
/** Main process logic, calc the last aggregation results of current device.
*/
- private void processCurrentDevice() throws Exception {
+ private void processCurrentDevice() {
currentDeviceEntry = initDeviceEntries.get(outputDeviceIndex);
if (!fetchLastCacheForCurrentDevice) {
resetAggArguments();
- int initChannel = 0;
- for (int i = 0; i < initTableAggregators.size(); i++) {
- TableAggregator aggregator = initTableAggregators.get(i);
- if (isLastByArray[i]) {
- processLastBy(i, initChannel);
+ int channelNum = 0;
+ for (int aggregatorNum = 0; aggregatorNum < initTableAggregators.size();
aggregatorNum++) {
+ TableAggregator aggregator = initTableAggregators.get(aggregatorNum);
+ if (isLastByArray[aggregatorNum]) {
+ processLastBy(aggregatorNum, channelNum);
} else {
- processLast(i, initChannel);
+ processLast(aggregatorNum, channelNum);
}
- initChannel += aggregator.getChannelCount();
+ channelNum += aggregator.getChannelCount();
}
fetchLastCacheForCurrentDevice = true;
}
if (hasUnCachedColumns()) {
- if (aggTableScanOperator == null) {
+ if (!hasBuildAggTableScanArguments) {
buildAggTableScanArguments();
}
- // if (!aggTableScanOperator.hasNext()) {
- // for (int i = 0; i < tableAggregators.size() - 1; i++) {
- // resultTsBlockBuilder
- // .getValueColumnBuilders()[groupKeySize +
- // unCachedMeasurementToAggregatorIndex.get(i)]
- // .appendNull();
- // }
- // outputDeviceIndex++;
- // fetchLastCacheForCurrentDevice = false;
- // appendToResultTsBlockBuilder();
- // return;
- // }
-
- // TsBlock updateBlock = aggTableScanOperator.next();
if (calculateAggregationResultForCurrentTimeRange()) {
int channel = 0;
List<String> updateMeasurementList = new ArrayList<>();
List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
- for (int i = 0; i < tableAggregators.size(); i++) {
- ColumnBuilder valueColumnBuilder =
- resultTsBlockBuilder
- .getValueColumnBuilders()[
- groupKeySize + unCachedMeasurementToAggregatorIndex.get(i)];
- // if (updateBlock.getColumn(i).isNull(0)) {
- // valueColumnBuilder.appendNull();
- // } else {
- // valueColumnBuilder
- //
.writeBinary(updateBlock.getColumn(i).getBinary(0));
- tableAggregators.get(i).evaluate(valueColumnBuilder);
- // }
-
- boolean isLastBy =
- tableAggregators.get(i).getAccumulator() instanceof
LastByDescAccumulator;
-
- // if (!isLastBy || !updateBlock.getColumn(i).isNull(0)) {
- // ColumnSchema schema =
- // aggColumnSchemas.get(aggregatorInputChannels.get(channel));
- // if (schema.getColumnCategory() ==
TsTableColumnCategory.TIME
- // || schema.getColumnCategory() ==
TsTableColumnCategory.MEASUREMENT) {
- // updateMeasurementList.add(
- // schema.getColumnCategory() ==
TsTableColumnCategory.TIME ? "" :
- // schema.getName());
- // TimeValuePair tv =
- // new TimeValuePair(
- //
- // (LastDescAccumulator)(tableAggregators.get(i).getAccumulator()).
- //
updateBlock.getColumn(tableAggregators.size() - 1).getLong(0),
- //
updateBlock.getColumn(i).getTsPrimitiveType(0));
- // updateTimeValuePairList.add(tv);
- // }
- // }
+ for (TableAggregator tableAggregator : tableAggregators) {
+ ColumnSchema schema =
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+ if (schema.getColumnCategory() != TsTableColumnCategory.TIME
+ && schema.getColumnCategory() !=
TsTableColumnCategory.MEASUREMENT) {
+ // only time and measurement column can update last cache
+ continue;
+ }
+
+ boolean isLastBy = tableAggregator.getAccumulator() instanceof
LastByDescAccumulator;
+ if (!isLastBy) {
+ LastDescAccumulator lastAccumulator =
+ (LastDescAccumulator) tableAggregator.getAccumulator();
+ if (lastAccumulator.hasInitResult()) {
+ updateMeasurementList.add(
+ schema.getColumnCategory() == TsTableColumnCategory.TIME ?
"" : schema.getName());
+ TimeValuePair tv =
+ new TimeValuePair(
+ lastAccumulator.getMaxTime(),
+ cloneTsPrimitiveType(lastAccumulator.getLastValue()));
+ updateTimeValuePairList.add(tv);
+ }
+ } else {
+ // last_by return non-null value
+ LastByDescAccumulator lastByAccumulator =
+ (LastByDescAccumulator) tableAggregator.getAccumulator();
+ if (lastByAccumulator.hasInitResult()) {
+ updateMeasurementList.add(
+ schema.getColumnCategory() == TsTableColumnCategory.TIME ?
"" : schema.getName());
+ long lastTime = lastByAccumulator.getLastTimeOfY();
+ TimeValuePair tv =
+ lastByAccumulator.isXNull()
+ ? EMPTY_TIME_VALUE_PAIR
+ : new TimeValuePair(
+ lastTime,
cloneTsPrimitiveType(lastByAccumulator.getXResult()));
+ updateTimeValuePairList.add(tv);
+ }
+ }
+ }
+
+ if (!updateMeasurementList.isEmpty()) {
+ String[] updateMeasurementArray = updateMeasurementList.toArray(new
String[0]);
+ TimeValuePair[] updateTimeValuePairArray =
+ updateTimeValuePairList.toArray(new TimeValuePair[0]);
+
+ TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(
+ dbName, currentDeviceEntry.getDeviceID(),
updateMeasurementArray, false);
+ TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+ dbName,
+ currentDeviceEntry.getDeviceID(),
+ updateMeasurementArray,
+ updateTimeValuePairArray);
}
- String[] updateMeasurementArray = updateMeasurementList.toArray(new
String[0]);
- TimeValuePair[] updateTimeValuePairArray =
- updateTimeValuePairList.toArray(new TimeValuePair[0]);
-
- // TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(
- // dbName, currentDeviceEntry.getDeviceID(),
updateMeasurementArray, false);
- // TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
- // dbName,
- // currentDeviceEntry.getDeviceID(),
- // updateMeasurementArray,
- // updateTimeValuePairArray);
outputDeviceIndex++;
fetchLastCacheForCurrentDevice = false;
- appendToResultTsBlockBuilder();
+ appendGroupByToResult();
+ resetTableAggregators();
}
+ } else {
+ outputDeviceIndex++;
+ fetchLastCacheForCurrentDevice = false;
+ resultTsBlockBuilder.declarePosition();
}
}
@@ -307,11 +307,10 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
TABLE_DEVICE_SCHEMA_CACHE.getLastRow(
dbName, currentDeviceEntry.getDeviceID(), "",
Collections.singletonList(columnName));
- if (lastByResult.isPresent() && lastByResult.get().getRight()[0] !=
null) {
- TsPrimitiveType timeValuePair = lastByResult.get().getRight()[0];
+ if (lastByResult.isPresent() &&
lastByResult.get().getLeft().isPresent()) {
ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupKeySize +
aggregatorNum);
-
+ TsPrimitiveType timeValuePair = lastByResult.get().getRight()[0];
if (timeValuePair == null) {
columnBuilder.appendNull();
} else {
@@ -327,12 +326,12 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
columnBuilder.writeTsPrimitiveType(timeValuePair);
}
}
+
return;
}
}
unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
-
TableAggregator newAggregator =
new TableAggregator(
aggregator.getAccumulator(),
@@ -344,39 +343,7 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
// last_by always has three channels
for (int i = 0; i < 3; i++) {
- int aggIdx = initAggregatorInputChannels.get(channelNum + i);
- schema = initAggColumnSchemas.get(aggIdx);
- columnName = schema.getName();
-
- if (!aggColumnLayout.containsKey(columnName)) {
- switch (schema.getColumnCategory()) {
- case ID:
- case ATTRIBUTE:
- newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
- break;
- case MEASUREMENT:
- newAggColumnsIndexArray.add(measurementCount);
- measurementCount++;
- measurementColumnNames.add(schema.getName());
- measurementSchemas.add(
- new MeasurementSchema(schema.getName(),
getTSDataType(schema.getType())));
- break;
- case TIME:
- newAggColumnsIndexArray.add(-1);
- break;
- default:
- throw new IllegalArgumentException(
- "Unexpected category: " + schema.getColumnCategory());
- }
-
- aggColumnSchemas.add(schema);
- aggregatorInputChannels.add(newChannelCnt);
- aggColumnLayout.put(columnName, newChannelCnt++);
- } else {
- aggregatorInputChannels.add(aggColumnLayout.get(columnName));
- }
-
- newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
+ buildNewAggregators(channelNum, newAggregator, i);
}
}
@@ -409,12 +376,12 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
columnBuilder.writeTsPrimitiveType(timeValuePair.getValue());
}
}
+
return;
}
}
unCachedMeasurementToAggregatorIndex.add(aggregatorNum);
-
TableAggregator newAggregator =
new TableAggregator(
aggregator.getAccumulator(),
@@ -426,40 +393,45 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
// last_by always has two channels
for (int i = 0; i < 2; i++) {
- int aggIdx = initAggregatorInputChannels.get(channelNum + i);
- schema = initAggColumnSchemas.get(aggIdx);
- columnName = schema.getName();
-
- if (!aggColumnLayout.containsKey(columnName)) {
- switch (schema.getColumnCategory()) {
- case ID:
- case ATTRIBUTE:
- newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
- break;
- case MEASUREMENT:
- newAggColumnsIndexArray.add(measurementCount);
- measurementCount++;
- measurementColumnNames.add(schema.getName());
- measurementSchemas.add(
- new MeasurementSchema(schema.getName(),
getTSDataType(schema.getType())));
- break;
- case TIME:
- newAggColumnsIndexArray.add(-1);
- break;
- default:
- throw new IllegalArgumentException(
- "Unexpected category: " + schema.getColumnCategory());
- }
+ buildNewAggregators(channelNum, newAggregator, i);
+ }
+ }
- aggColumnSchemas.add(schema);
- aggregatorInputChannels.add(newChannelCnt);
- aggColumnLayout.put(columnName, newChannelCnt++);
- } else {
- aggregatorInputChannels.add(aggColumnLayout.get(columnName));
+ private void buildNewAggregators(int channelNum, TableAggregator
newAggregator, int i) {
+ ColumnSchema schema;
+ String columnName;
+ int aggIdx = initAggregatorInputChannels.get(channelNum + i);
+ schema = initAggColumnSchemas.get(aggIdx);
+ columnName = schema.getName();
+
+ if (!aggColumnLayout.containsKey(columnName)) {
+ switch (schema.getColumnCategory()) {
+ case ID:
+ case ATTRIBUTE:
+ newAggColumnsIndexArray.add(initAggColumnsIndexArray[aggIdx]);
+ break;
+ case MEASUREMENT:
+ newAggColumnsIndexArray.add(measurementCount);
+ measurementCount++;
+ measurementColumnNames.add(schema.getName());
+ measurementSchemas.add(
+ new MeasurementSchema(schema.getName(),
getTSDataType(schema.getType())));
+ break;
+ case TIME:
+ newAggColumnsIndexArray.add(-1);
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected category: " +
schema.getColumnCategory());
}
- newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
+ aggColumnSchemas.add(schema);
+ aggregatorInputChannels.add(newChannelCnt);
+ aggColumnLayout.put(columnName, newChannelCnt++);
+ } else {
+ aggregatorInputChannels.add(aggColumnLayout.get(columnName));
}
+
+ newAggregator.getInputChannels()[i] = aggColumnLayout.get(columnName);
}
private boolean hasUnCachedColumns() {
@@ -483,31 +455,14 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
// seriesScanOptions.getPushLimitToEachDevice());
deviceEntries.clear();
deviceEntries.add(currentDeviceEntry);
+ currentDeviceIndex = 0;
this.measurementColumnTSDataTypes =
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
constructAlignedSeriesScanUtil();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
- aggTableScanOperator =
- new TableAggregationTableScanOperator(
- sourceId,
- operatorContext,
- aggColumnSchemas,
- aggColumnsIndexArray,
- Collections.singletonList(currentDeviceEntry),
- seriesScanOptions,
- measurementColumnNames,
- allSensors,
- measurementSchemas,
- tableAggregators,
- Collections.emptyList(),
- null,
- timeIterator,
- false,
- canUseStatistics,
- aggregatorInputChannels);
-
+ this.hasBuildAggTableScanArguments = true;
// aggTableScanOperator.initQueryDataSource(this.queryDataSource);
}
@@ -528,45 +483,63 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
@Override
protected void updateResultTsBlock() {
- // appendAggregationResult(resultTsBlockBuilder, tableAggregators);
+ appendAggregationResult(resultTsBlockBuilder, tableAggregators);
// after appendAggregationResult invoked, aggregators must be cleared
// resetTableAggregators();
}
- private void appendToResultTsBlockBuilder() {
+ @Override
+ /** Append a row of aggregation results to the result tsBlock. */
+ public void appendAggregationResult(
+ TsBlockBuilder tsBlockBuilder, List<? extends TableAggregator>
aggregators) {
+
+ // no data in current time range, just output empty
+ if (!timeIterator.hasCachedTimeRange()) {
+ return;
+ }
+
+ ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+ for (int i = 0; i < aggregators.size(); i++) {
+ aggregators
+ .get(i)
+ .evaluate(columnBuilders[groupKeySize +
unCachedMeasurementToAggregatorIndex.get(i)]);
+ }
+
+ tsBlockBuilder.declarePosition();
+ }
+
+ private void appendGroupByToResult() {
ColumnBuilder[] columnBuilders =
resultTsBlockBuilder.getValueColumnBuilders();
- for (int i = 0; i < groupKeySize; i++) {
- if (TsTableColumnCategory.ID ==
groupingKeySchemas.get(i).getColumnCategory()) {
- String id =
- (String)
-
initDeviceEntries.get(outputDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
- if (id == null) {
- columnBuilders[i].appendNull();
- } else {
- columnBuilders[i].writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
- }
- } else {
- Binary attribute =
- initDeviceEntries
- .get(outputDeviceIndex)
- .getAttributeColumnValues()
- .get(groupingKeyIndex[i]);
- if (attribute == null) {
- columnBuilders[i].appendNull();
+ if (groupingKeyIndex != null) {
+ for (int i = 0; i < groupKeySize; i++) {
+ if (TsTableColumnCategory.ID ==
groupingKeySchemas.get(i).getColumnCategory()) {
+ String id =
+ (String)
deviceEntries.get(currentDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
+ if (id == null) {
+ columnBuilders[i].appendNull();
+ } else {
+ columnBuilders[i].writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
} else {
- columnBuilders[i].writeBinary(attribute);
+ Binary attribute =
+ deviceEntries
+ .get(currentDeviceIndex)
+ .getAttributeColumnValues()
+ .get(groupingKeyIndex[i]);
+ if (attribute == null) {
+ columnBuilders[i].appendNull();
+ } else {
+ columnBuilders[i].writeBinary(attribute);
+ }
}
}
}
- resultTsBlockBuilder.declarePosition();
}
- private void resetAggArguments() throws Exception {
- if (aggTableScanOperator != null) {
- aggTableScanOperator.close();
- }
- aggTableScanOperator = null;
+ private void resetAggArguments() {
+ hasBuildAggTableScanArguments = false;
tableAggregators = new ArrayList<>();
newChannelCnt = 0;
@@ -591,6 +564,32 @@ public class TableLastQueryOperator extends
TableAggregationTableScanOperator {
resultTsBlockBuilder.reset();
}
+ private TsPrimitiveType cloneTsPrimitiveType(TsPrimitiveType originalValue) {
+ switch (originalValue.getDataType()) {
+ case BOOLEAN:
+ return new TsPrimitiveType.TsBoolean(originalValue.getBoolean());
+ case INT32:
+ case DATE:
+ return new TsPrimitiveType.TsInt(originalValue.getInt());
+ case INT64:
+ case TIMESTAMP:
+ return new TsPrimitiveType.TsLong(originalValue.getLong());
+ case FLOAT:
+ return new TsPrimitiveType.TsFloat(originalValue.getFloat());
+ case DOUBLE:
+ return new TsPrimitiveType.TsDouble(originalValue.getDouble());
+ case TEXT:
+ case BLOB:
+ case STRING:
+ return new TsPrimitiveType.TsBinary(originalValue.getBinary());
+ case VECTOR:
+ return new TsPrimitiveType.TsVector(originalValue.getVector());
+ default:
+ throw new UnSupportedDataTypeException(
+ "Unsupported data type:" + originalValue.getDataType());
+ }
+ }
+
// @Override
// protected void constructAlignedSeriesScanUtil() {
// // TODO?
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
index ab9f876d7e3..c29d5ec6faf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
@@ -49,6 +49,18 @@ public class LastAccumulator implements TableAccumulator {
lastValue = TsPrimitiveType.getByType(seriesDataType);
}
+ public boolean hasInitResult() {
+ return this.initResult;
+ }
+
+ public long getMaxTime() {
+ return this.maxTime;
+ }
+
+ public TsPrimitiveType getLastValue() {
+ return this.lastValue;
+ }
+
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
index 2ad6b8dd089..1605f7d6d3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
@@ -45,8 +45,8 @@ public class LastByAccumulator implements TableAccumulator {
protected final TSDataType xDataType;
protected final TSDataType yDataType;
- private final boolean xIsTimeColumn;
- private final boolean yIsTimeColumn;
+ protected final boolean xIsTimeColumn;
+ protected final boolean yIsTimeColumn;
private long yLastTime = Long.MIN_VALUE;
@@ -69,6 +69,22 @@ public class LastByAccumulator implements TableAccumulator {
return this.yIsTimeColumn;
}
+ public boolean hasInitResult() {
+ return this.initResult;
+ }
+
+ public long getLastTimeOfY() {
+ return this.yLastTime;
+ }
+
+ public boolean isXNull() {
+ return xIsNull;
+ }
+
+ public TsPrimitiveType getXResult() {
+ return this.xResult;
+ }
+
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
index a44b41d6223..d6e81aacc96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
@@ -29,6 +29,11 @@ public class LastByDescAccumulator extends LastByAccumulator
{
super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
}
+ @Override
+ public TableAccumulator copy() {
+ return new LastByDescAccumulator(xDataType, yDataType, xIsTimeColumn,
yIsTimeColumn);
+ }
+
@Override
public boolean hasFinalResult() {
return initResult;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
index bc0e97f9b7b..0fad4cec0db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
@@ -28,6 +28,11 @@ public class LastDescAccumulator extends LastAccumulator {
super(seriesDataType);
}
+ @Override
+ public TableAccumulator copy() {
+ return new LastDescAccumulator(seriesDataType);
+ }
+
@Override
public boolean hasFinalResult() {
return initResult;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 951015be1e9..1bb0161e665 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.planner;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -1952,6 +1953,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
aggColumnSchemas,
aggColumnsIndexArray,
node.getDeviceEntries(),
+ node.getDeviceEntries().size(),
seriesScanOptions,
measurementColumnNames,
allSensors,
@@ -2078,6 +2080,10 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
private boolean canUseLastCacheOptimize(List<TableAggregator> aggregators) {
+ if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() &&
aggregators.isEmpty()) {
+ return false;
+ }
+
for (TableAggregator aggregator : aggregators) {
if (!(aggregator.getAccumulator() instanceof LastDescAccumulator
|| (aggregator.getAccumulator() instanceof LastByDescAccumulator
@@ -2085,6 +2091,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
return false;
}
}
+
return true;
}
}