This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch EnhancedDeviceCrossRegionIT
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/EnhancedDeviceCrossRegionIT by
this push:
new 5ff934eb60d Fix early termination logic of first, first_by, last,
last_by accumulators (#16333)
5ff934eb60d is described below
commit 5ff934eb60ddd5d923113305d00976417ae0cce7
Author: Weihao Li <[email protected]>
AuthorDate: Wed Sep 3 09:35:19 2025 +0800
Fix early termination logic of first, first_by, last, last_by accumulators
(#16333)
---
.../relational/aggregation/AccumulatorFactory.java | 40 ++++++++++---
.../relational/aggregation/FirstAccumulator.java | 56 ++++++++++++-----
.../relational/aggregation/FirstByAccumulator.java | 70 ++++++++++++++++------
.../aggregation/FirstByDescAccumulator.java | 7 ++-
.../aggregation/FirstDescAccumulator.java | 7 ++-
.../aggregation/LastByDescAccumulator.java | 58 +++++++++++++-----
.../aggregation/LastDescAccumulator.java | 12 +++-
.../plan/planner/TableOperatorGenerator.java | 11 +++-
.../process/window/function/FunctionTestUtils.java | 3 +-
9 files changed, 198 insertions(+), 66 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index b22ffcbf849..d7195cc6dfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -94,6 +94,7 @@ public class AccumulatorFactory {
List<Expression> inputExpressions,
Map<String, String> inputAttributes,
boolean ascending,
+ boolean isAggTableScan,
String timeColumnName,
Set<String> measurementColumnNames,
boolean distinct) {
@@ -107,6 +108,8 @@ public class AccumulatorFactory {
&& inputExpressions.size() > 1) {
boolean xIsTimeColumn = isTimeColumn(inputExpressions.get(0),
timeColumnName);
boolean yIsTimeColumn = isTimeColumn(inputExpressions.get(1),
timeColumnName);
+ // When used in AggTableScanOperator, we can finish calculation of
+ // LastDesc/LastByDesc/First/First_by after the result has been
initialized
if (LAST_BY.getFunctionName().equals(functionName)) {
result =
ascending
@@ -118,12 +121,17 @@ public class AccumulatorFactory {
xIsTimeColumn,
yIsTimeColumn,
isMeasurementColumn(inputExpressions.get(0),
measurementColumnNames),
- isMeasurementColumn(inputExpressions.get(1),
measurementColumnNames));
+ isMeasurementColumn(inputExpressions.get(1),
measurementColumnNames),
+ isAggTableScan);
} else {
result =
ascending
? new FirstByAccumulator(
- inputDataTypes.get(0), inputDataTypes.get(1),
xIsTimeColumn, yIsTimeColumn)
+ inputDataTypes.get(0),
+ inputDataTypes.get(1),
+ xIsTimeColumn,
+ yIsTimeColumn,
+ isAggTableScan)
: new FirstByDescAccumulator(
inputDataTypes.get(0), inputDataTypes.get(1),
xIsTimeColumn, yIsTimeColumn);
}
@@ -133,11 +141,17 @@ public class AccumulatorFactory {
: new LastDescAccumulator(
inputDataTypes.get(0),
isTimeColumn(inputExpressions.get(0), timeColumnName),
- isMeasurementColumn(inputExpressions.get(0),
measurementColumnNames));
+ isMeasurementColumn(inputExpressions.get(0),
measurementColumnNames),
+ isAggTableScan);
} else {
result =
createBuiltinAccumulator(
- aggregationType, inputDataTypes, inputExpressions,
inputAttributes, ascending);
+ aggregationType,
+ inputDataTypes,
+ inputExpressions,
+ inputAttributes,
+ ascending,
+ isAggTableScan);
}
if (distinct) {
@@ -279,7 +293,8 @@ public class AccumulatorFactory {
List<TSDataType> inputDataTypes,
List<Expression> inputExpressions,
Map<String, String> inputAttributes,
- boolean ascending) {
+ boolean ascending,
+ boolean isAggTableScan) {
switch (aggregationType) {
case COUNT:
return new CountAccumulator();
@@ -294,10 +309,10 @@ public class AccumulatorFactory {
case LAST:
return ascending
? new LastAccumulator(inputDataTypes.get(0))
- : new LastDescAccumulator(inputDataTypes.get(0), false, false);
+ : new LastDescAccumulator(inputDataTypes.get(0), false, false,
isAggTableScan);
case FIRST:
return ascending
- ? new FirstAccumulator(inputDataTypes.get(0))
+ ? new FirstAccumulator(inputDataTypes.get(0), isAggTableScan)
: new FirstDescAccumulator(inputDataTypes.get(0));
case MAX:
return new MaxAccumulator(inputDataTypes.get(0));
@@ -307,10 +322,17 @@ public class AccumulatorFactory {
return ascending
? new LastByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1), false, false)
: new LastByDescAccumulator(
- inputDataTypes.get(0), inputDataTypes.get(1), false, false,
false, false);
+ inputDataTypes.get(0),
+ inputDataTypes.get(1),
+ false,
+ false,
+ false,
+ false,
+ isAggTableScan);
case FIRST_BY:
return ascending
- ? new FirstByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1), false, false)
+ ? new FirstByAccumulator(
+ inputDataTypes.get(0), inputDataTypes.get(1), false, false,
isAggTableScan)
: new FirstByDescAccumulator(
inputDataTypes.get(0), inputDataTypes.get(1), false, false);
case MAX_BY:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
index b01349e6082..3e4111fd454 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
@@ -43,10 +43,12 @@ public class FirstAccumulator implements TableAccumulator {
protected TsPrimitiveType firstValue;
protected long minTime = Long.MAX_VALUE;
protected boolean initResult = false;
+ private final boolean canFinishAfterInit;
- public FirstAccumulator(TSDataType seriesDataType) {
+ public FirstAccumulator(TSDataType seriesDataType, boolean
canFinishAfterInit) {
this.seriesDataType = seriesDataType;
firstValue = TsPrimitiveType.getByType(seriesDataType);
+ this.canFinishAfterInit = canFinishAfterInit;
}
@Override
@@ -56,7 +58,7 @@ public class FirstAccumulator implements TableAccumulator {
@Override
public TableAccumulator copy() {
- return new FirstAccumulator(seriesDataType);
+ return new FirstAccumulator(seriesDataType, canFinishAfterInit);
}
@Override
@@ -196,7 +198,7 @@ public class FirstAccumulator implements TableAccumulator {
@Override
public boolean hasFinalResult() {
- return initResult;
+ return canFinishAfterInit && initResult;
}
@Override
@@ -250,7 +252,9 @@ public class FirstAccumulator implements TableAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
updateIntFirstValue(valueColumn.getInt(i), timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -260,7 +264,9 @@ public class FirstAccumulator implements TableAccumulator {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
updateIntFirstValue(valueColumn.getInt(position),
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -281,7 +287,9 @@ public class FirstAccumulator implements TableAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
updateLongFirstValue(valueColumn.getLong(i), timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -291,7 +299,9 @@ public class FirstAccumulator implements TableAccumulator {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
updateLongFirstValue(valueColumn.getLong(position),
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -312,7 +322,9 @@ public class FirstAccumulator implements TableAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
updateFloatFirstValue(valueColumn.getFloat(i),
timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -322,7 +334,9 @@ public class FirstAccumulator implements TableAccumulator {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
updateFloatFirstValue(valueColumn.getFloat(position),
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -343,7 +357,9 @@ public class FirstAccumulator implements TableAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
updateDoubleFirstValue(valueColumn.getDouble(i),
timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -353,7 +369,9 @@ public class FirstAccumulator implements TableAccumulator {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
updateDoubleFirstValue(valueColumn.getDouble(position),
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -374,7 +392,9 @@ public class FirstAccumulator implements TableAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
updateBinaryFirstValue(valueColumn.getBinary(i),
timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -384,7 +404,9 @@ public class FirstAccumulator implements TableAccumulator {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
updateBinaryFirstValue(valueColumn.getBinary(position),
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -405,7 +427,9 @@ public class FirstAccumulator implements TableAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
updateBooleanFirstValue(valueColumn.getBoolean(i),
timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -415,7 +439,9 @@ public class FirstAccumulator implements TableAccumulator {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
updateBooleanFirstValue(valueColumn.getBoolean(position),
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
index 63c87bc65d1..e53bfd0bfd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
@@ -40,11 +40,11 @@ public class FirstByAccumulator implements TableAccumulator
{
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(FirstByAccumulator.class);
- private final TSDataType xDataType;
- private final TSDataType yDataType;
+ protected final TSDataType xDataType;
+ protected final TSDataType yDataType;
- private final boolean xIsTimeColumn;
- private final boolean yIsTimeColumn;
+ protected final boolean xIsTimeColumn;
+ protected final boolean yIsTimeColumn;
private long yFirstTime = Long.MAX_VALUE;
@@ -53,14 +53,21 @@ public class FirstByAccumulator implements TableAccumulator
{
private boolean initResult = false;
+ private final boolean canFinishAfterInit;
+
public FirstByAccumulator(
- TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn,
boolean yIsTimeColumn) {
+ TSDataType xDataType,
+ TSDataType yDataType,
+ boolean xIsTimeColumn,
+ boolean yIsTimeColumn,
+ boolean canFinishAfterInit) {
this.xDataType = xDataType;
this.yDataType = yDataType;
this.xIsTimeColumn = xIsTimeColumn;
this.yIsTimeColumn = yIsTimeColumn;
this.xResult = TsPrimitiveType.getByType(xDataType);
+ this.canFinishAfterInit = canFinishAfterInit;
}
@Override
@@ -70,7 +77,8 @@ public class FirstByAccumulator implements TableAccumulator {
@Override
public TableAccumulator copy() {
- return new FirstByAccumulator(xDataType, yDataType, xIsTimeColumn,
yIsTimeColumn);
+ return new FirstByAccumulator(
+ xDataType, yDataType, xIsTimeColumn, yIsTimeColumn,
canFinishAfterInit);
}
@Override
@@ -225,7 +233,7 @@ public class FirstByAccumulator implements TableAccumulator
{
@Override
public boolean hasFinalResult() {
- return initResult;
+ return canFinishAfterInit && initResult;
}
@Override
@@ -309,7 +317,9 @@ public class FirstByAccumulator implements TableAccumulator
{
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateIntFirstValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -319,7 +329,9 @@ public class FirstByAccumulator implements TableAccumulator
{
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateIntFirstValue(xColumn, position, timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -355,7 +367,9 @@ public class FirstByAccumulator implements TableAccumulator
{
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateLongFirstValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -365,7 +379,9 @@ public class FirstByAccumulator implements TableAccumulator
{
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateLongFirstValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -401,7 +417,9 @@ public class FirstByAccumulator implements TableAccumulator
{
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateFloatFirstValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -411,7 +429,9 @@ public class FirstByAccumulator implements TableAccumulator
{
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateFloatFirstValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -447,7 +467,9 @@ public class FirstByAccumulator implements TableAccumulator
{
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateDoubleFirstValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -457,7 +479,9 @@ public class FirstByAccumulator implements TableAccumulator
{
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateDoubleFirstValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -493,7 +517,9 @@ public class FirstByAccumulator implements TableAccumulator
{
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateBinaryFirstValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -503,7 +529,9 @@ public class FirstByAccumulator implements TableAccumulator
{
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateBinaryFirstValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -539,7 +567,9 @@ public class FirstByAccumulator implements TableAccumulator
{
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateBooleanFirstValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -549,7 +579,9 @@ public class FirstByAccumulator implements TableAccumulator
{
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateBooleanFirstValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
index 130ffbb676d..c6f5fdb9a76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
@@ -26,7 +26,7 @@ public class FirstByDescAccumulator extends
FirstByAccumulator {
public FirstByDescAccumulator(
TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn,
boolean yIsTimeColumn) {
- super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
+ super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn, false);
}
@Override
@@ -34,6 +34,11 @@ public class FirstByDescAccumulator extends
FirstByAccumulator {
return false;
}
+ @Override
+ public TableAccumulator copy() {
+ return new FirstByDescAccumulator(xDataType, yDataType, xIsTimeColumn,
yIsTimeColumn);
+ }
+
@Override
protected void addIntInput(
Column xColumn, Column yColumn, Column timeColumn, AggregationMask mask)
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
index 009be6c778b..d8cc8ef0286 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
@@ -25,7 +25,7 @@ import org.apache.tsfile.enums.TSDataType;
public class FirstDescAccumulator extends FirstAccumulator {
public FirstDescAccumulator(TSDataType seriesDataType) {
- super(seriesDataType);
+ super(seriesDataType, false);
}
@Override
@@ -33,6 +33,11 @@ public class FirstDescAccumulator extends FirstAccumulator {
return false;
}
+ @Override
+ public TableAccumulator copy() {
+ return new FirstDescAccumulator(seriesDataType);
+ }
+
@Override
protected void addIntInput(Column valueColumn, Column timeColumn,
AggregationMask mask) {
int positionCount = mask.getSelectedPositionCount();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
index 04bf1213f61..37ac6c79036 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
@@ -25,6 +25,7 @@ import org.apache.tsfile.enums.TSDataType;
public class LastByDescAccumulator extends LastByAccumulator {
private final boolean xIsMeasurementColumn;
private final boolean yIsMeasurementColumn;
+ private final boolean canFinishAfterInit;
public LastByDescAccumulator(
TSDataType xDataType,
@@ -32,10 +33,12 @@ public class LastByDescAccumulator extends
LastByAccumulator {
boolean xIsTimeColumn,
boolean yIsTimeColumn,
boolean xIsMeasurementColumn,
- boolean yIsMeasurementColumn) {
+ boolean yIsMeasurementColumn,
+ boolean canFinishAfterInit) {
super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
this.xIsMeasurementColumn = xIsMeasurementColumn;
this.yIsMeasurementColumn = yIsMeasurementColumn;
+ this.canFinishAfterInit = canFinishAfterInit;
}
public boolean xIsTimeColumn() {
@@ -62,12 +65,13 @@ public class LastByDescAccumulator extends
LastByAccumulator {
xIsTimeColumn,
yIsTimeColumn,
xIsMeasurementColumn,
- yIsMeasurementColumn);
+ yIsMeasurementColumn,
+ canFinishAfterInit);
}
@Override
public boolean hasFinalResult() {
- return initResult;
+ return canFinishAfterInit && initResult;
}
@Override
@@ -79,7 +83,9 @@ public class LastByDescAccumulator extends LastByAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateIntLastValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -89,7 +95,9 @@ public class LastByDescAccumulator extends LastByAccumulator {
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateIntLastValue(xColumn, position, timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -104,7 +112,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateLongLastValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -114,7 +124,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateLongLastValue(xColumn, position, timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -129,7 +141,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateFloatLastValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -139,7 +153,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateFloatLastValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -154,7 +170,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateDoubleLastValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -164,7 +182,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateDoubleLastValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -179,7 +199,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateBinaryLastValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -189,7 +211,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateBinaryLastValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
@@ -204,7 +228,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
for (int i = 0; i < positionCount; i++) {
if (!yColumn.isNull(i)) {
updateBooleanLastValue(xColumn, i, timeColumn.getLong(i));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
} else {
@@ -214,7 +240,9 @@ public class LastByDescAccumulator extends
LastByAccumulator {
position = selectedPositions[i];
if (!yColumn.isNull(position)) {
updateBooleanLastValue(xColumn, position,
timeColumn.getLong(position));
- return;
+ if (canFinishAfterInit) {
+ return;
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
index 23ee0d21054..d015b759957 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
@@ -25,12 +25,17 @@ import org.apache.tsfile.enums.TSDataType;
public class LastDescAccumulator extends LastAccumulator {
private final boolean isTimeColumn;
private final boolean isMeasurementColumn;
+ private final boolean canFinishAfterInit;
public LastDescAccumulator(
- TSDataType seriesDataType, boolean isTimeColumn, boolean
isMeasurementColumn) {
+ TSDataType seriesDataType,
+ boolean isTimeColumn,
+ boolean isMeasurementColumn,
+ boolean canFinishAfterInit) {
super(seriesDataType);
this.isTimeColumn = isTimeColumn;
this.isMeasurementColumn = isMeasurementColumn;
+ this.canFinishAfterInit = canFinishAfterInit;
}
public boolean isTimeColumn() {
@@ -43,12 +48,13 @@ public class LastDescAccumulator extends LastAccumulator {
@Override
public TableAccumulator copy() {
- return new LastDescAccumulator(seriesDataType, isTimeColumn,
isMeasurementColumn);
+ return new LastDescAccumulator(
+ seriesDataType, isTimeColumn, isMeasurementColumn, canFinishAfterInit);
}
@Override
public boolean hasFinalResult() {
- return initResult;
+ return canFinishAfterInit && initResult;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 7e308dd8ca6..d6c2e53c24b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -2531,6 +2531,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getStep(),
typeProvider,
true,
+ false,
null,
Collections.emptySet())));
return new AggregationOperator(operatorContext, child,
aggregatorBuilder.build());
@@ -2544,6 +2545,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
AggregationNode.Step step,
TypeProvider typeProvider,
boolean scanAscending,
+ boolean isAggTableScan,
String timeColumnName,
Set<String> measurementColumnNames) {
List<Integer> argumentChannels = new ArrayList<>();
@@ -2565,6 +2567,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
aggregation.getArguments(),
Collections.emptyMap(),
scanAscending,
+ isAggTableScan,
timeColumnName,
measurementColumnNames,
aggregation.isDistinct());
@@ -2610,6 +2613,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getStep(),
typeProvider,
true,
+ false,
null,
Collections.emptySet())));
@@ -2905,6 +2909,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getStep(),
context.getTypeProvider(),
scanAscending,
+ true,
timeColumnName,
measurementColumnsIndexMap.keySet()));
}
@@ -3370,7 +3375,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
originalArgumentTypes,
arguments.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
Collections.emptyMap(),
- true);
+ true,
+ false);
BoundSignature signature = resolvedFunction.getSignature();
@@ -4070,7 +4076,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
originalArgumentTypes,
function.getArguments(),
Collections.emptyMap(),
- true);
+ true,
+ false);
// Create aggregator by accumulator
return new WindowAggregator(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
index ccec0f68563..47f9063d899 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
@@ -105,7 +105,8 @@ public class FunctionTestUtils {
Collections.singletonList(inputDataType),
new ArrayList<>(),
new HashMap<>(),
- ascending);
+ ascending,
+ false);
WindowAggregator aggregator =
new WindowAggregator(accumulator, outputDataType,
Collections.singletonList(0));
return new AggregationWindowFunction(aggregator);