This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch EnhancedDeviceCrossRegionIT
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/EnhancedDeviceCrossRegionIT by 
this push:
     new 5ff934eb60d Fix early termination logic of first, first_by, last, 
last_by accumulators (#16333)
5ff934eb60d is described below

commit 5ff934eb60ddd5d923113305d00976417ae0cce7
Author: Weihao Li <[email protected]>
AuthorDate: Wed Sep 3 09:35:19 2025 +0800

    Fix early termination logic of first, first_by, last, last_by accumulators 
(#16333)
---
 .../relational/aggregation/AccumulatorFactory.java | 40 ++++++++++---
 .../relational/aggregation/FirstAccumulator.java   | 56 ++++++++++++-----
 .../relational/aggregation/FirstByAccumulator.java | 70 ++++++++++++++++------
 .../aggregation/FirstByDescAccumulator.java        |  7 ++-
 .../aggregation/FirstDescAccumulator.java          |  7 ++-
 .../aggregation/LastByDescAccumulator.java         | 58 +++++++++++++-----
 .../aggregation/LastDescAccumulator.java           | 12 +++-
 .../plan/planner/TableOperatorGenerator.java       | 11 +++-
 .../process/window/function/FunctionTestUtils.java |  3 +-
 9 files changed, 198 insertions(+), 66 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index b22ffcbf849..d7195cc6dfb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -94,6 +94,7 @@ public class AccumulatorFactory {
       List<Expression> inputExpressions,
       Map<String, String> inputAttributes,
       boolean ascending,
+      boolean isAggTableScan,
       String timeColumnName,
       Set<String> measurementColumnNames,
       boolean distinct) {
@@ -107,6 +108,8 @@ public class AccumulatorFactory {
         && inputExpressions.size() > 1) {
       boolean xIsTimeColumn = isTimeColumn(inputExpressions.get(0), 
timeColumnName);
       boolean yIsTimeColumn = isTimeColumn(inputExpressions.get(1), 
timeColumnName);
+      // When used in AggTableScanOperator, we can finish calculation of
+      // LastDesc/LastByDesc/First/First_by after the result has been 
initialized
       if (LAST_BY.getFunctionName().equals(functionName)) {
         result =
             ascending
@@ -118,12 +121,17 @@ public class AccumulatorFactory {
                     xIsTimeColumn,
                     yIsTimeColumn,
                     isMeasurementColumn(inputExpressions.get(0), 
measurementColumnNames),
-                    isMeasurementColumn(inputExpressions.get(1), 
measurementColumnNames));
+                    isMeasurementColumn(inputExpressions.get(1), 
measurementColumnNames),
+                    isAggTableScan);
       } else {
         result =
             ascending
                 ? new FirstByAccumulator(
-                    inputDataTypes.get(0), inputDataTypes.get(1), 
xIsTimeColumn, yIsTimeColumn)
+                    inputDataTypes.get(0),
+                    inputDataTypes.get(1),
+                    xIsTimeColumn,
+                    yIsTimeColumn,
+                    isAggTableScan)
                 : new FirstByDescAccumulator(
                     inputDataTypes.get(0), inputDataTypes.get(1), 
xIsTimeColumn, yIsTimeColumn);
       }
@@ -133,11 +141,17 @@ public class AccumulatorFactory {
           : new LastDescAccumulator(
               inputDataTypes.get(0),
               isTimeColumn(inputExpressions.get(0), timeColumnName),
-              isMeasurementColumn(inputExpressions.get(0), 
measurementColumnNames));
+              isMeasurementColumn(inputExpressions.get(0), 
measurementColumnNames),
+              isAggTableScan);
     } else {
       result =
           createBuiltinAccumulator(
-              aggregationType, inputDataTypes, inputExpressions, 
inputAttributes, ascending);
+              aggregationType,
+              inputDataTypes,
+              inputExpressions,
+              inputAttributes,
+              ascending,
+              isAggTableScan);
     }
 
     if (distinct) {
@@ -279,7 +293,8 @@ public class AccumulatorFactory {
       List<TSDataType> inputDataTypes,
       List<Expression> inputExpressions,
       Map<String, String> inputAttributes,
-      boolean ascending) {
+      boolean ascending,
+      boolean isAggTableScan) {
     switch (aggregationType) {
       case COUNT:
         return new CountAccumulator();
@@ -294,10 +309,10 @@ public class AccumulatorFactory {
       case LAST:
         return ascending
             ? new LastAccumulator(inputDataTypes.get(0))
-            : new LastDescAccumulator(inputDataTypes.get(0), false, false);
+            : new LastDescAccumulator(inputDataTypes.get(0), false, false, 
isAggTableScan);
       case FIRST:
         return ascending
-            ? new FirstAccumulator(inputDataTypes.get(0))
+            ? new FirstAccumulator(inputDataTypes.get(0), isAggTableScan)
             : new FirstDescAccumulator(inputDataTypes.get(0));
       case MAX:
         return new MaxAccumulator(inputDataTypes.get(0));
@@ -307,10 +322,17 @@ public class AccumulatorFactory {
         return ascending
             ? new LastByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1), false, false)
             : new LastByDescAccumulator(
-                inputDataTypes.get(0), inputDataTypes.get(1), false, false, 
false, false);
+                inputDataTypes.get(0),
+                inputDataTypes.get(1),
+                false,
+                false,
+                false,
+                false,
+                isAggTableScan);
       case FIRST_BY:
         return ascending
-            ? new FirstByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1), false, false)
+            ? new FirstByAccumulator(
+                inputDataTypes.get(0), inputDataTypes.get(1), false, false, 
isAggTableScan)
             : new FirstByDescAccumulator(
                 inputDataTypes.get(0), inputDataTypes.get(1), false, false);
       case MAX_BY:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
index b01349e6082..3e4111fd454 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
@@ -43,10 +43,12 @@ public class FirstAccumulator implements TableAccumulator {
   protected TsPrimitiveType firstValue;
   protected long minTime = Long.MAX_VALUE;
   protected boolean initResult = false;
+  private final boolean canFinishAfterInit;
 
-  public FirstAccumulator(TSDataType seriesDataType) {
+  public FirstAccumulator(TSDataType seriesDataType, boolean 
canFinishAfterInit) {
     this.seriesDataType = seriesDataType;
     firstValue = TsPrimitiveType.getByType(seriesDataType);
+    this.canFinishAfterInit = canFinishAfterInit;
   }
 
   @Override
@@ -56,7 +58,7 @@ public class FirstAccumulator implements TableAccumulator {
 
   @Override
   public TableAccumulator copy() {
-    return new FirstAccumulator(seriesDataType);
+    return new FirstAccumulator(seriesDataType, canFinishAfterInit);
   }
 
   @Override
@@ -196,7 +198,7 @@ public class FirstAccumulator implements TableAccumulator {
 
   @Override
   public boolean hasFinalResult() {
-    return initResult;
+    return canFinishAfterInit && initResult;
   }
 
   @Override
@@ -250,7 +252,9 @@ public class FirstAccumulator implements TableAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
           updateIntFirstValue(valueColumn.getInt(i), timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -260,7 +264,9 @@ public class FirstAccumulator implements TableAccumulator {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
           updateIntFirstValue(valueColumn.getInt(position), 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -281,7 +287,9 @@ public class FirstAccumulator implements TableAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
           updateLongFirstValue(valueColumn.getLong(i), timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -291,7 +299,9 @@ public class FirstAccumulator implements TableAccumulator {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
           updateLongFirstValue(valueColumn.getLong(position), 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -312,7 +322,9 @@ public class FirstAccumulator implements TableAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
           updateFloatFirstValue(valueColumn.getFloat(i), 
timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -322,7 +334,9 @@ public class FirstAccumulator implements TableAccumulator {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
           updateFloatFirstValue(valueColumn.getFloat(position), 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -343,7 +357,9 @@ public class FirstAccumulator implements TableAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
           updateDoubleFirstValue(valueColumn.getDouble(i), 
timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -353,7 +369,9 @@ public class FirstAccumulator implements TableAccumulator {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
           updateDoubleFirstValue(valueColumn.getDouble(position), 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -374,7 +392,9 @@ public class FirstAccumulator implements TableAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
           updateBinaryFirstValue(valueColumn.getBinary(i), 
timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -384,7 +404,9 @@ public class FirstAccumulator implements TableAccumulator {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
           updateBinaryFirstValue(valueColumn.getBinary(position), 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -405,7 +427,9 @@ public class FirstAccumulator implements TableAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!valueColumn.isNull(i)) {
           updateBooleanFirstValue(valueColumn.getBoolean(i), 
timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -415,7 +439,9 @@ public class FirstAccumulator implements TableAccumulator {
         position = selectedPositions[i];
         if (!valueColumn.isNull(position)) {
           updateBooleanFirstValue(valueColumn.getBoolean(position), 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
index 63c87bc65d1..e53bfd0bfd5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
@@ -40,11 +40,11 @@ public class FirstByAccumulator implements TableAccumulator 
{
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(FirstByAccumulator.class);
 
-  private final TSDataType xDataType;
-  private final TSDataType yDataType;
+  protected final TSDataType xDataType;
+  protected final TSDataType yDataType;
 
-  private final boolean xIsTimeColumn;
-  private final boolean yIsTimeColumn;
+  protected final boolean xIsTimeColumn;
+  protected final boolean yIsTimeColumn;
 
   private long yFirstTime = Long.MAX_VALUE;
 
@@ -53,14 +53,21 @@ public class FirstByAccumulator implements TableAccumulator 
{
 
   private boolean initResult = false;
 
+  private final boolean canFinishAfterInit;
+
   public FirstByAccumulator(
-      TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn, 
boolean yIsTimeColumn) {
+      TSDataType xDataType,
+      TSDataType yDataType,
+      boolean xIsTimeColumn,
+      boolean yIsTimeColumn,
+      boolean canFinishAfterInit) {
     this.xDataType = xDataType;
     this.yDataType = yDataType;
     this.xIsTimeColumn = xIsTimeColumn;
     this.yIsTimeColumn = yIsTimeColumn;
 
     this.xResult = TsPrimitiveType.getByType(xDataType);
+    this.canFinishAfterInit = canFinishAfterInit;
   }
 
   @Override
@@ -70,7 +77,8 @@ public class FirstByAccumulator implements TableAccumulator {
 
   @Override
   public TableAccumulator copy() {
-    return new FirstByAccumulator(xDataType, yDataType, xIsTimeColumn, 
yIsTimeColumn);
+    return new FirstByAccumulator(
+        xDataType, yDataType, xIsTimeColumn, yIsTimeColumn, 
canFinishAfterInit);
   }
 
   @Override
@@ -225,7 +233,7 @@ public class FirstByAccumulator implements TableAccumulator 
{
 
   @Override
   public boolean hasFinalResult() {
-    return initResult;
+    return canFinishAfterInit && initResult;
   }
 
   @Override
@@ -309,7 +317,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateIntFirstValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -319,7 +329,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateIntFirstValue(xColumn, position, timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -355,7 +367,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateLongFirstValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -365,7 +379,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateLongFirstValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -401,7 +417,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateFloatFirstValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -411,7 +429,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateFloatFirstValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -447,7 +467,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateDoubleFirstValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -457,7 +479,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateDoubleFirstValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -493,7 +517,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateBinaryFirstValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -503,7 +529,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateBinaryFirstValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -539,7 +567,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateBooleanFirstValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -549,7 +579,9 @@ public class FirstByAccumulator implements TableAccumulator 
{
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateBooleanFirstValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
index 130ffbb676d..c6f5fdb9a76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
@@ -26,7 +26,7 @@ public class FirstByDescAccumulator extends 
FirstByAccumulator {
 
   public FirstByDescAccumulator(
       TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn, 
boolean yIsTimeColumn) {
-    super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
+    super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn, false);
   }
 
   @Override
@@ -34,6 +34,11 @@ public class FirstByDescAccumulator extends 
FirstByAccumulator {
     return false;
   }
 
+  @Override
+  public TableAccumulator copy() {
+    return new FirstByDescAccumulator(xDataType, yDataType, xIsTimeColumn, 
yIsTimeColumn);
+  }
+
   @Override
   protected void addIntInput(
       Column xColumn, Column yColumn, Column timeColumn, AggregationMask mask) 
{
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
index 009be6c778b..d8cc8ef0286 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
@@ -25,7 +25,7 @@ import org.apache.tsfile.enums.TSDataType;
 public class FirstDescAccumulator extends FirstAccumulator {
 
   public FirstDescAccumulator(TSDataType seriesDataType) {
-    super(seriesDataType);
+    super(seriesDataType, false);
   }
 
   @Override
@@ -33,6 +33,11 @@ public class FirstDescAccumulator extends FirstAccumulator {
     return false;
   }
 
+  @Override
+  public TableAccumulator copy() {
+    return new FirstDescAccumulator(seriesDataType);
+  }
+
   @Override
   protected void addIntInput(Column valueColumn, Column timeColumn, 
AggregationMask mask) {
     int positionCount = mask.getSelectedPositionCount();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
index 04bf1213f61..37ac6c79036 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
@@ -25,6 +25,7 @@ import org.apache.tsfile.enums.TSDataType;
 public class LastByDescAccumulator extends LastByAccumulator {
   private final boolean xIsMeasurementColumn;
   private final boolean yIsMeasurementColumn;
+  private final boolean canFinishAfterInit;
 
   public LastByDescAccumulator(
       TSDataType xDataType,
@@ -32,10 +33,12 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
       boolean xIsTimeColumn,
       boolean yIsTimeColumn,
       boolean xIsMeasurementColumn,
-      boolean yIsMeasurementColumn) {
+      boolean yIsMeasurementColumn,
+      boolean canFinishAfterInit) {
     super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
     this.xIsMeasurementColumn = xIsMeasurementColumn;
     this.yIsMeasurementColumn = yIsMeasurementColumn;
+    this.canFinishAfterInit = canFinishAfterInit;
   }
 
   public boolean xIsTimeColumn() {
@@ -62,12 +65,13 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
         xIsTimeColumn,
         yIsTimeColumn,
         xIsMeasurementColumn,
-        yIsMeasurementColumn);
+        yIsMeasurementColumn,
+        canFinishAfterInit);
   }
 
   @Override
   public boolean hasFinalResult() {
-    return initResult;
+    return canFinishAfterInit && initResult;
   }
 
   @Override
@@ -79,7 +83,9 @@ public class LastByDescAccumulator extends LastByAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateIntLastValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -89,7 +95,9 @@ public class LastByDescAccumulator extends LastByAccumulator {
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateIntLastValue(xColumn, position, timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -104,7 +112,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateLongLastValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -114,7 +124,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateLongLastValue(xColumn, position, timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -129,7 +141,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateFloatLastValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -139,7 +153,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateFloatLastValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -154,7 +170,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateDoubleLastValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -164,7 +182,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateDoubleLastValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -179,7 +199,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateBinaryLastValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -189,7 +211,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateBinaryLastValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
@@ -204,7 +228,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
       for (int i = 0; i < positionCount; i++) {
         if (!yColumn.isNull(i)) {
           updateBooleanLastValue(xColumn, i, timeColumn.getLong(i));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     } else {
@@ -214,7 +240,9 @@ public class LastByDescAccumulator extends 
LastByAccumulator {
         position = selectedPositions[i];
         if (!yColumn.isNull(position)) {
           updateBooleanLastValue(xColumn, position, 
timeColumn.getLong(position));
-          return;
+          if (canFinishAfterInit) {
+            return;
+          }
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
index 23ee0d21054..d015b759957 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
@@ -25,12 +25,17 @@ import org.apache.tsfile.enums.TSDataType;
 public class LastDescAccumulator extends LastAccumulator {
   private final boolean isTimeColumn;
   private final boolean isMeasurementColumn;
+  private final boolean canFinishAfterInit;
 
   public LastDescAccumulator(
-      TSDataType seriesDataType, boolean isTimeColumn, boolean 
isMeasurementColumn) {
+      TSDataType seriesDataType,
+      boolean isTimeColumn,
+      boolean isMeasurementColumn,
+      boolean canFinishAfterInit) {
     super(seriesDataType);
     this.isTimeColumn = isTimeColumn;
     this.isMeasurementColumn = isMeasurementColumn;
+    this.canFinishAfterInit = canFinishAfterInit;
   }
 
   public boolean isTimeColumn() {
@@ -43,12 +48,13 @@ public class LastDescAccumulator extends LastAccumulator {
 
   @Override
   public TableAccumulator copy() {
-    return new LastDescAccumulator(seriesDataType, isTimeColumn, 
isMeasurementColumn);
+    return new LastDescAccumulator(
+        seriesDataType, isTimeColumn, isMeasurementColumn, canFinishAfterInit);
   }
 
   @Override
   public boolean hasFinalResult() {
-    return initResult;
+    return canFinishAfterInit && initResult;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 7e308dd8ca6..d6c2e53c24b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -2531,6 +2531,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                         node.getStep(),
                         typeProvider,
                         true,
+                        false,
                         null,
                         Collections.emptySet())));
     return new AggregationOperator(operatorContext, child, 
aggregatorBuilder.build());
@@ -2544,6 +2545,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
       AggregationNode.Step step,
       TypeProvider typeProvider,
       boolean scanAscending,
+      boolean isAggTableScan,
       String timeColumnName,
       Set<String> measurementColumnNames) {
     List<Integer> argumentChannels = new ArrayList<>();
@@ -2565,6 +2567,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             aggregation.getArguments(),
             Collections.emptyMap(),
             scanAscending,
+            isAggTableScan,
             timeColumnName,
             measurementColumnNames,
             aggregation.isDistinct());
@@ -2610,6 +2613,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                             node.getStep(),
                             typeProvider,
                             true,
+                            false,
                             null,
                             Collections.emptySet())));
 
@@ -2905,6 +2909,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
               node.getStep(),
               context.getTypeProvider(),
               scanAscending,
+              true,
               timeColumnName,
               measurementColumnsIndexMap.keySet()));
     }
@@ -3370,7 +3375,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             originalArgumentTypes,
             
arguments.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
             Collections.emptyMap(),
-            true);
+            true,
+            false);
 
     BoundSignature signature = resolvedFunction.getSignature();
 
@@ -4070,7 +4076,8 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             originalArgumentTypes,
             function.getArguments(),
             Collections.emptyMap(),
-            true);
+            true,
+            false);
 
     // Create aggregator by accumulator
     return new WindowAggregator(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
index ccec0f68563..47f9063d899 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/FunctionTestUtils.java
@@ -105,7 +105,8 @@ public class FunctionTestUtils {
             Collections.singletonList(inputDataType),
             new ArrayList<>(),
             new HashMap<>(),
-            ascending);
+            ascending,
+            false);
     WindowAggregator aggregator =
         new WindowAggregator(accumulator, outputDataType, 
Collections.singletonList(0));
     return new AggregationWindowFunction(aggregator);

Reply via email to