This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b8852699a49 Fix operator tests for transient null TsBlocks (#17838) 
(#17847)
b8852699a49 is described below

commit b8852699a495458169e7078a5554d92275e06545
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 5 11:44:40 2026 +0800

    Fix operator tests for transient null TsBlocks (#17838) (#17847)
---
 .../AlignedSeriesAggregationScanOperatorTest.java  | 33 +++++++-------
 .../execution/operator/FillOperatorTest.java       |  7 +--
 .../operator/HorizontallyConcatOperatorTest.java   |  3 +-
 .../execution/operator/LinearFillOperatorTest.java |  5 +-
 .../execution/operator/MergeSortOperatorTest.java  |  3 +-
 .../execution/operator/OffsetOperatorTest.java     | 19 ++++++--
 .../execution/operator/OperatorTestUtils.java      | 53 ++++++++++++++++++++++
 .../SeriesAggregationScanOperatorTest.java         | 33 +++++++-------
 .../execution/operator/SeriesScanOperatorTest.java |  3 +-
 .../operator/UpdateLastCacheOperatorTest.java      |  7 +--
 10 files changed, 118 insertions(+), 48 deletions(-)

diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 2a5c5c13d0b..bc81047b17a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -69,6 +69,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -122,7 +123,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       for (int i = 0; i < measurementSchemas.size(); i++) {
         assertEquals(500, resultTsBlock.getColumn(i).getLong(0));
       }
@@ -153,7 +154,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, false, 
null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       for (int i = 0; i < measurementSchemas.size(); i++) {
         assertEquals(500, resultTsBlock.getColumn(i).getLong(0));
       }
@@ -189,7 +190,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
       count++;
@@ -227,7 +228,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
       assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
       assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -269,7 +270,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, false, 
null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
       assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
       assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -304,7 +305,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, 
true, null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       for (int i = 0; i < measurementSchemas.size(); i++) {
         assertEquals(resultTsBlock.getColumn(i).getLong(0), 380);
       }
@@ -337,7 +338,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, 
true, null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       for (int i = 0; i < measurementSchemas.size(); i++) {
         assertEquals(resultTsBlock.getColumn(i).getLong(0), 380);
       }
@@ -370,7 +371,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, 
true, null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       for (int i = 0; i < measurementSchemas.size(); i++) {
         assertEquals(resultTsBlock.getColumn(i).getLong(0), 300);
       }
@@ -410,7 +411,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, timeFilter, 
true, null);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
       assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
       assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -448,7 +449,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -489,7 +490,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
             aggregators, timeFilter, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -533,7 +534,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -578,7 +579,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, false, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * (3 - count), 
resultTsBlock.getTimeColumn().getLong(pos));
@@ -613,7 +614,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -645,7 +646,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(timeColumn[count], 
resultTsBlock.getTimeColumn().getLong(pos));
@@ -688,7 +689,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
         initAlignedSeriesAggregationScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(timeColumn[count], 
resultTsBlock.getTimeColumn().getLong(pos));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
index e2bb5a655ec..ac3c83a8364 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -209,7 +210,7 @@ public class FillOperatorTest {
             }
           };
       while (fillOperator.hasNext()) {
-        TsBlock block = fillOperator.next();
+        TsBlock block = nextNonNullOrEmpty(fillOperator);
         for (int i = 0; i < block.getPositionCount(); i++) {
           long expectedTime = i + 1 + count * 10000L;
           assertEquals(expectedTime, block.getTimeByIndex(i));
@@ -386,7 +387,7 @@ public class FillOperatorTest {
             }
           };
       while (fillOperator.hasNext()) {
-        TsBlock block = fillOperator.next();
+        TsBlock block = nextNonNullOrEmpty(fillOperator);
         for (int i = 0; i < block.getPositionCount(); i++) {
           long expectedTime = i + 1 + count * 10000L;
           assertEquals(expectedTime, block.getTimeByIndex(i));
@@ -563,7 +564,7 @@ public class FillOperatorTest {
             }
           };
       while (fillOperator.hasNext()) {
-        TsBlock block = fillOperator.next();
+        TsBlock block = nextNonNullOrEmpty(fillOperator);
         for (int i = 0; i < block.getPositionCount(); i++) {
           long expectedTime = i + 1 + count * 10000L;
           assertEquals(expectedTime, block.getTimeByIndex(i));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
index bad50a7e1c9..ec258a36c0e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
@@ -66,6 +66,7 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -197,7 +198,7 @@ public class HorizontallyConcatOperatorTest {
       int count = 0;
       while (horizontallyConcatOperator.isBlocked().isDone()
           && horizontallyConcatOperator.hasNext()) {
-        TsBlock tsBlock = horizontallyConcatOperator.next();
+        TsBlock tsBlock = nextNonNullOrEmpty(horizontallyConcatOperator);
         assertEquals(6, tsBlock.getValueColumnCount());
         for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
           assertEquals(count, tsBlock.getTimeByIndex(i));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
index 08859dc127b..8ea63f132e9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
@@ -41,8 +41,8 @@ import org.junit.Test;
 import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class LinearFillOperatorTest {
@@ -1301,8 +1301,7 @@ public class LinearFillOperatorTest {
           };
 
       while (fillOperator.hasNext()) {
-        TsBlock block = fillOperator.next();
-        assertNotNull(block);
+        TsBlock block = nextNonNullOrEmpty(fillOperator);
         for (int i = 0; i < block.getPositionCount(); i++) {
           long expectedTime = i + count;
           assertEquals(expectedTime, block.getTimeByIndex(i));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index 76ec370ad98..9afc9c7c540 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -88,6 +88,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -1765,7 +1766,7 @@ public class MergeSortOperatorTest {
 
     int index = 0;
     while (mergeSortOperator.isBlocked().isDone() && 
mergeSortOperator.hasNext()) {
-      TsBlock result = mergeSortOperator.next();
+      TsBlock result = nextNonNullOrEmpty(mergeSortOperator);
       for (int i = 0; i < result.getPositionCount(); i++) {
         long time = result.getTimeByIndex(i);
         assertEquals(time, ans[index++]);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
index a1188becf10..6602c65ff98 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -161,7 +162,7 @@ public class OffsetOperatorTest {
           new LimitOperator(driverContext.getOperatorContexts().get(4), 250, 
offsetOperator);
       int count = 100;
       while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) {
-        TsBlock tsBlock = limitOperator.next();
+        TsBlock tsBlock = nextNonNullOrEmpty(limitOperator);
         assertEquals(2, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
         assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -265,7 +266,7 @@ public class OffsetOperatorTest {
 
       int count = 0;
       while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
-        TsBlock tsBlock = offsetOperator.next();
+        TsBlock tsBlock = nextNonNullOrEmpty(offsetOperator);
         assertEquals(2, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
         assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -367,7 +368,7 @@ public class OffsetOperatorTest {
           new OffsetOperator(driverContext.getOperatorContexts().get(3), 500, 
timeJoinOperator);
 
       while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
-        TsBlock tsBlock = offsetOperator.next();
+        TsBlock tsBlock = nextNonNull(offsetOperator);
         assertEquals(2, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
         assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -455,7 +456,7 @@ public class OffsetOperatorTest {
               driverContext.getOperatorContexts().get(3), 98_784_247_808L, 
timeJoinOperator);
 
       while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
-        TsBlock tsBlock = offsetOperator.next();
+        TsBlock tsBlock = nextNonNull(offsetOperator);
         assertEquals(2, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
         assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -468,4 +469,14 @@ public class OffsetOperatorTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  private static TsBlock nextNonNull(Operator operator) throws Exception {
+    while (operator.hasNext()) {
+      TsBlock result = operator.next();
+      if (result != null) {
+        return result;
+      }
+    }
+    throw new AssertionError("Expected a non-null TsBlock from operator");
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
new file mode 100644
index 00000000000..37b979e87be
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+public final class OperatorTestUtils {
+
+  private OperatorTestUtils() {
+    // Utility class.
+  }
+
+  public static TsBlock nextNonNullOrEmpty(Operator operator) throws Exception 
{
+    while (operator.hasNext()) {
+      TsBlock result = operator.next();
+      if (!isNullOrEmpty(result)) {
+        return result;
+      }
+    }
+    throw new AssertionError("Expected a non-null and non-empty TsBlock from 
operator");
+  }
+
+  public static TsBlock lastNonNullOrEmpty(Operator operator) throws Exception 
{
+    TsBlock result = null;
+    while (operator.isBlocked().isDone() && operator.hasNext()) {
+      TsBlock nextResult = operator.next();
+      if (!isNullOrEmpty(nextResult)) {
+        result = nextResult;
+      }
+    }
+    return result;
+  }
+
+  private static boolean isNullOrEmpty(TsBlock tsBlock) {
+    return tsBlock == null || tsBlock.getPositionCount() == 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
index bbf96e89fb4..afa08223d64 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -66,6 +66,7 @@ import java.util.concurrent.ExecutorService;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 
 public class SeriesAggregationScanOperatorTest {
@@ -111,7 +112,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       count++;
     }
@@ -135,7 +136,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       count++;
     }
@@ -161,7 +162,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
       count++;
@@ -192,7 +193,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
       assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
       assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -228,7 +229,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
       assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
       assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -259,7 +260,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
       count++;
     }
@@ -285,7 +286,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
       count++;
     }
@@ -311,7 +312,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(resultTsBlock.getColumn(0).getLong(0), 300);
       count++;
     }
@@ -343,7 +344,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       assertEquals(20100, resultTsBlock.getColumn(0).getInt(0));
       assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
       assertEquals(100, resultTsBlock.getColumn(2).getLong(0));
@@ -375,7 +376,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -408,7 +409,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -450,7 +451,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -495,7 +496,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * (3 - count), 
resultTsBlock.getTimeColumn().getLong(pos));
@@ -530,7 +531,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -563,7 +564,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(timeColumn[count], 
resultTsBlock.getTimeColumn().getLong(pos));
@@ -606,7 +607,7 @@ public class SeriesAggregationScanOperatorTest {
     int count = 0;
 
     while (seriesAggregationScanOperator.hasNext()) {
-      TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+      TsBlock resultTsBlock = 
nextNonNullOrEmpty(seriesAggregationScanOperator);
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(timeColumn[count], 
resultTsBlock.getTimeColumn().getLong(pos));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
index 11daae6f1c5..39455ab4208 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
@@ -55,6 +55,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -114,7 +115,7 @@ public class SeriesScanOperatorTest {
 
       int count = 0;
       while (seriesScanOperator.hasNext()) {
-        TsBlock tsBlock = seriesScanOperator.next();
+        TsBlock tsBlock = nextNonNullOrEmpty(seriesScanOperator);
         assertEquals(1, tsBlock.getValueColumnCount());
         assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
         for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
index 0c9c47815ae..30604831e26 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ExecutorService;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -103,7 +104,7 @@ public class UpdateLastCacheOperatorTest {
 
       assertTrue(updateLastCacheOperator.isBlocked().isDone());
       assertTrue(updateLastCacheOperator.hasNext());
-      TsBlock result = updateLastCacheOperator.next();
+      TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
       assertEquals(1, result.getPositionCount());
       assertEquals(3, result.getValueColumnCount());
 
@@ -133,7 +134,7 @@ public class UpdateLastCacheOperatorTest {
 
       assertTrue(updateLastCacheOperator.isBlocked().isDone());
       assertTrue(updateLastCacheOperator.hasNext());
-      TsBlock result = updateLastCacheOperator.next();
+      TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
       assertEquals(1, result.getPositionCount());
       assertEquals(3, result.getValueColumnCount());
 
@@ -163,7 +164,7 @@ public class UpdateLastCacheOperatorTest {
 
       assertTrue(updateLastCacheOperator.isBlocked().isDone());
       assertTrue(updateLastCacheOperator.hasNext());
-      TsBlock result = updateLastCacheOperator.next();
+      TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
       assertEquals(1, result.getPositionCount());
       assertEquals(3, result.getValueColumnCount());
 


Reply via email to