This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 703c7f427a [IOTDB-3627] Fix issue of slidingTimeWindowQuery  (#6567)
703c7f427a is described below

commit 703c7f427a473ada5cd741619c27e406769d1f1e
Author: lancelly <[email protected]>
AuthorDate: Tue Jul 5 10:55:49 2022 +0800

    [IOTDB-3627] Fix issue of slidingTimeWindowQuery  (#6567)
---
 .../org/apache/iotdb/db/it/IoTDBNestedQueryIT.java |  1 -
 .../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java     | 19 ++------
 .../operator/process/TransformOperator.java        |  9 ++--
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  1 -
 ...InputColumnMultiReferenceIntermediateLayer.java | 52 +++++++++++++++-------
 ...nputColumnSingleReferenceIntermediateLayer.java |  4 +-
 6 files changed, 49 insertions(+), 37 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
index 9110a2c7e4..27d860828a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
@@ -247,7 +247,6 @@ public class IoTDBNestedQueryIT {
   }
 
   @Test
-  @Ignore
   public void testNestedWindowingFunctionExpressions() {
     final int[] windows =
         new int[] {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
index 7458e4e202..8c8007629d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
@@ -25,11 +25,7 @@ import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.apache.iotdb.itbase.constant.UDFTestConstant;
 
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
@@ -45,7 +41,7 @@ import static org.junit.Assert.fail;
 @Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBUDFWindowQueryIT {
 
-  protected static final int ITERATION_TIMES = 100;
+  protected static final int ITERATION_TIMES = 10000;
 
   protected static boolean enableSeqSpaceCompaction;
   protected static boolean enableUnseqSpaceCompaction;
@@ -176,6 +172,7 @@ public class IoTDBUDFWindowQueryIT {
     testSlidingSizeWindow(3 * ITERATION_TIMES);
   }
 
+  // todo: remove ignore when exception handler in IT finishes
   @Test
   @Ignore
   public void testSlidingSizeWindow7() {
@@ -259,7 +256,6 @@ public class IoTDBUDFWindowQueryIT {
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow1() {
     testSlidingTimeWindow(
         (int) (0.33 * ITERATION_TIMES),
@@ -269,7 +265,6 @@ public class IoTDBUDFWindowQueryIT {
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow2() {
     testSlidingTimeWindow(
         (int) (0.033 * ITERATION_TIMES),
@@ -279,7 +274,6 @@ public class IoTDBUDFWindowQueryIT {
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow3() {
     testSlidingTimeWindow(
         (int) (2 * 0.033 * ITERATION_TIMES),
@@ -289,7 +283,6 @@ public class IoTDBUDFWindowQueryIT {
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow4() {
     testSlidingTimeWindow(
         (int) (0.033 * ITERATION_TIMES),
@@ -299,27 +292,23 @@ public class IoTDBUDFWindowQueryIT {
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow5() {
     testSlidingTimeWindow(ITERATION_TIMES, ITERATION_TIMES, 0, 
ITERATION_TIMES);
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow6() {
     testSlidingTimeWindow(
         (int) (1.01 * ITERATION_TIMES), (int) (0.01 * ITERATION_TIMES), 0, 
ITERATION_TIMES / 2);
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow7() {
     testSlidingTimeWindow(
         (int) (0.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0, 
ITERATION_TIMES / 2);
   }
 
   @Test
-  @Ignore
   public void testSlidingTimeWindow8() {
     testSlidingTimeWindow(
         (int) (1.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0, 
ITERATION_TIMES / 2);
@@ -722,7 +711,6 @@ public class IoTDBUDFWindowQueryIT {
   }
 
   @Test
-  @Ignore
   public void testSizeWindowUDFWithConstants() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
@@ -763,7 +751,6 @@ public class IoTDBUDFWindowQueryIT {
   }
 
   @Test
-  @Ignore
   public void testTimeWindowUDFWithConstants() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 6ed6ec9ca1..0e08b682f1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -94,7 +94,6 @@ public class TransformOperator implements ProcessOperator {
     initInputLayer(inputDataTypes);
     initUdtfContext(outputExpressions, zoneId);
     initTransformers(inputLocations, outputExpressions, typeProvider);
-
     timeHeap = new TimeSelector(transformers.length << 1, isAscending);
     shouldIterateReadersToNextValid = new boolean[outputExpressions.length];
     Arrays.fill(shouldIterateReadersToNextValid, true);
@@ -174,6 +173,9 @@ public class TransformOperator implements ProcessOperator {
 
   @Override
   public final boolean hasNext() {
+    if (!timeHeap.isEmpty()) {
+      return true;
+    }
     try {
       if (iterateAllColumnsToNextValid() == 
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
         return true;
@@ -182,7 +184,6 @@ public class TransformOperator implements ProcessOperator {
       LOGGER.error("TransformOperator#hasNext()", e);
       throw new RuntimeException(e);
     }
-
     return !timeHeap.isEmpty();
   }
 
@@ -340,7 +341,9 @@ public class TransformOperator implements ProcessOperator {
 
   @Override
   public boolean isFinished() {
-    return inputOperator.isFinished() && timeHeap.isEmpty();
+    // call hasNext first, or data of inputOperator could be missing
+    boolean flag = !hasNext();
+    return timeHeap.isEmpty() && (flag || inputOperator.isFinished());
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index e4f34d448e..ce8afcf154 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -187,7 +187,6 @@ import static 
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode
  * run a fragment instance parallel and take full advantage of multi-cores
  */
 public class LocalExecutionPlanner {
-
   private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER =
       MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 8a7ee2f0ad..18130cf77a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -377,8 +377,7 @@ public class 
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
 
   @Override
   protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
-      throws IOException, QueryProcessException {
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
 
     final long timeInterval = strategy.getTimeInterval();
     final long slidingStep = strategy.getSlidingStep();
@@ -388,27 +387,37 @@ public class 
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
     final ElasticSerializableTVListBackedSingleColumnWindow window =
         new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
 
-    long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
-    if (tvList.size() == 0
-        && LayerCacheUtils.cachePoint(
-            parentLayerPointReaderDataType, parentLayerPointReader, tvList)
-        && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
-      // display window begin should be set to the same as the min timestamp 
of the query result
-      // set
-      nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
-    }
-    long finalNextWindowTimeBeginGivenByStrategy = 
nextWindowTimeBeginGivenByStrategy;
-
-    final boolean hasAtLeastOneRow = tvList.size() != 0;
+    final long nextWindowTimeBeginGivenByStrategy = 
strategy.getDisplayWindowBegin();
 
     return new LayerRowWindowReader() {
 
+      private boolean isFirstIteration = true;
       private boolean hasCached = false;
-      private long nextWindowTimeBegin = 
finalNextWindowTimeBeginGivenByStrategy;
+      private long nextWindowTimeBegin = nextWindowTimeBeginGivenByStrategy;
       private int nextIndexBegin = 0;
+      private boolean hasAtLeastOneRow;
 
       @Override
       public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(
+                    parentLayerPointReaderDataType, parentLayerPointReader, 
tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+            // display window begin should be set to the same as the min 
timestamp of the query
+            // result
+            // set
+            nextWindowTimeBegin = tvList.getTime(0);
+          }
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
         if (hasCached) {
           return YieldableState.YIELDABLE;
         }
@@ -458,6 +467,19 @@ public class 
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
 
       @Override
       public boolean next() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0
+              && LayerCacheUtils.cachePoint(
+                  parentLayerPointReaderDataType, parentLayerPointReader, 
tvList)
+              && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+            // display window begin should be set to the same as the min 
timestamp of the query
+            // result
+            // set
+            nextWindowTimeBegin = tvList.getTime(0);
+          }
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
         if (hasCached) {
           return true;
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index 51c2a330c8..d22bf0a091 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -275,12 +275,14 @@ public class 
SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       @Override
       public YieldableState yield() throws IOException, QueryProcessException {
         if (isFirstIteration) {
-          if (tvList.size() == 0 && nextWindowTimeBegin == Long.MIN_VALUE) {
+          if (tvList.size() == 0) {
             final YieldableState yieldableState =
                 LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, 
tvList);
             if (yieldableState != YieldableState.YIELDABLE) {
               return yieldableState;
             }
+          }
+          if (nextWindowTimeBegin == Long.MIN_VALUE) {
             // display window begin should be set to the same as the min 
timestamp of the query
             // result set
             nextWindowTimeBegin = tvList.getTime(0);

Reply via email to