This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 703c7f427a [IOTDB-3627] Fix issue of slidingTimeWindowQuery (#6567)
703c7f427a is described below
commit 703c7f427a473ada5cd741619c27e406769d1f1e
Author: lancelly <[email protected]>
AuthorDate: Tue Jul 5 10:55:49 2022 +0800
[IOTDB-3627] Fix issue of slidingTimeWindowQuery (#6567)
---
.../org/apache/iotdb/db/it/IoTDBNestedQueryIT.java | 1 -
.../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java | 19 ++------
.../operator/process/TransformOperator.java | 9 ++--
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 1 -
...InputColumnMultiReferenceIntermediateLayer.java | 52 +++++++++++++++-------
...nputColumnSingleReferenceIntermediateLayer.java | 4 +-
6 files changed, 49 insertions(+), 37 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
index 9110a2c7e4..27d860828a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBNestedQueryIT.java
@@ -247,7 +247,6 @@ public class IoTDBNestedQueryIT {
}
@Test
- @Ignore
public void testNestedWindowingFunctionExpressions() {
final int[] windows =
new int[] {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
index 7458e4e202..8c8007629d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
@@ -25,11 +25,7 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.itbase.constant.UDFTestConstant;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -45,7 +41,7 @@ import static org.junit.Assert.fail;
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBUDFWindowQueryIT {
- protected static final int ITERATION_TIMES = 100;
+ protected static final int ITERATION_TIMES = 10000;
protected static boolean enableSeqSpaceCompaction;
protected static boolean enableUnseqSpaceCompaction;
@@ -176,6 +172,7 @@ public class IoTDBUDFWindowQueryIT {
testSlidingSizeWindow(3 * ITERATION_TIMES);
}
+ // todo: remove ignore when exception handler in IT finishes
@Test
@Ignore
public void testSlidingSizeWindow7() {
@@ -259,7 +256,6 @@ public class IoTDBUDFWindowQueryIT {
}
@Test
- @Ignore
public void testSlidingTimeWindow1() {
testSlidingTimeWindow(
(int) (0.33 * ITERATION_TIMES),
@@ -269,7 +265,6 @@ public class IoTDBUDFWindowQueryIT {
}
@Test
- @Ignore
public void testSlidingTimeWindow2() {
testSlidingTimeWindow(
(int) (0.033 * ITERATION_TIMES),
@@ -279,7 +274,6 @@ public class IoTDBUDFWindowQueryIT {
}
@Test
- @Ignore
public void testSlidingTimeWindow3() {
testSlidingTimeWindow(
(int) (2 * 0.033 * ITERATION_TIMES),
@@ -289,7 +283,6 @@ public class IoTDBUDFWindowQueryIT {
}
@Test
- @Ignore
public void testSlidingTimeWindow4() {
testSlidingTimeWindow(
(int) (0.033 * ITERATION_TIMES),
@@ -299,27 +292,23 @@ public class IoTDBUDFWindowQueryIT {
}
@Test
- @Ignore
public void testSlidingTimeWindow5() {
testSlidingTimeWindow(ITERATION_TIMES, ITERATION_TIMES, 0,
ITERATION_TIMES);
}
@Test
- @Ignore
public void testSlidingTimeWindow6() {
testSlidingTimeWindow(
(int) (1.01 * ITERATION_TIMES), (int) (0.01 * ITERATION_TIMES), 0,
ITERATION_TIMES / 2);
}
@Test
- @Ignore
public void testSlidingTimeWindow7() {
testSlidingTimeWindow(
(int) (0.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0,
ITERATION_TIMES / 2);
}
@Test
- @Ignore
public void testSlidingTimeWindow8() {
testSlidingTimeWindow(
(int) (1.01 * ITERATION_TIMES), (int) (1.01 * ITERATION_TIMES), 0,
ITERATION_TIMES / 2);
@@ -722,7 +711,6 @@ public class IoTDBUDFWindowQueryIT {
}
@Test
- @Ignore
public void testSizeWindowUDFWithConstants() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
@@ -763,7 +751,6 @@ public class IoTDBUDFWindowQueryIT {
}
@Test
- @Ignore
public void testTimeWindowUDFWithConstants() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 6ed6ec9ca1..0e08b682f1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -94,7 +94,6 @@ public class TransformOperator implements ProcessOperator {
initInputLayer(inputDataTypes);
initUdtfContext(outputExpressions, zoneId);
initTransformers(inputLocations, outputExpressions, typeProvider);
-
timeHeap = new TimeSelector(transformers.length << 1, isAscending);
shouldIterateReadersToNextValid = new boolean[outputExpressions.length];
Arrays.fill(shouldIterateReadersToNextValid, true);
@@ -174,6 +173,9 @@ public class TransformOperator implements ProcessOperator {
@Override
public final boolean hasNext() {
+ if (!timeHeap.isEmpty()) {
+ return true;
+ }
try {
if (iterateAllColumnsToNextValid() ==
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return true;
@@ -182,7 +184,6 @@ public class TransformOperator implements ProcessOperator {
LOGGER.error("TransformOperator#hasNext()", e);
throw new RuntimeException(e);
}
-
return !timeHeap.isEmpty();
}
@@ -340,7 +341,9 @@ public class TransformOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return inputOperator.isFinished() && timeHeap.isEmpty();
+ // call hasNext first, or data of inputOperator could be missing
+ boolean flag = !hasNext();
+ return timeHeap.isEmpty() && (flag || inputOperator.isFinished());
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index e4f34d448e..ce8afcf154 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -187,7 +187,6 @@ import static
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode
* run a fragment instance parallel and take full advantage of multi-cores
*/
public class LocalExecutionPlanner {
-
private static final MPPDataExchangeManager MPP_DATA_EXCHANGE_MANAGER =
MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 8a7ee2f0ad..18130cf77a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -377,8 +377,7 @@ public class
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
@Override
protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
- SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
- throws IOException, QueryProcessException {
+ SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
final long timeInterval = strategy.getTimeInterval();
final long slidingStep = strategy.getSlidingStep();
@@ -388,27 +387,37 @@ public class
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
final ElasticSerializableTVListBackedSingleColumnWindow window =
new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
- long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
- if (tvList.size() == 0
- && LayerCacheUtils.cachePoint(
- parentLayerPointReaderDataType, parentLayerPointReader, tvList)
- && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
- // display window begin should be set to the same as the min timestamp
of the query result
- // set
- nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
- }
- long finalNextWindowTimeBeginGivenByStrategy =
nextWindowTimeBeginGivenByStrategy;
-
- final boolean hasAtLeastOneRow = tvList.size() != 0;
+ final long nextWindowTimeBeginGivenByStrategy =
strategy.getDisplayWindowBegin();
return new LayerRowWindowReader() {
+ private boolean isFirstIteration = true;
private boolean hasCached = false;
- private long nextWindowTimeBegin =
finalNextWindowTimeBeginGivenByStrategy;
+ private long nextWindowTimeBegin = nextWindowTimeBeginGivenByStrategy;
private int nextIndexBegin = 0;
+ private boolean hasAtLeastOneRow;
@Override
public YieldableState yield() throws IOException, QueryProcessException {
+ if (isFirstIteration) {
+ if (tvList.size() == 0) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(
+ parentLayerPointReaderDataType, parentLayerPointReader,
tvList);
+ if (yieldableState != YieldableState.YIELDABLE) {
+ return yieldableState;
+ }
+ }
+ if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+ // display window begin should be set to the same as the min
timestamp of the query
+ // result
+ // set
+ nextWindowTimeBegin = tvList.getTime(0);
+ }
+ hasAtLeastOneRow = tvList.size() != 0;
+ isFirstIteration = false;
+ }
+
if (hasCached) {
return YieldableState.YIELDABLE;
}
@@ -458,6 +467,19 @@ public class
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
@Override
public boolean next() throws IOException, QueryProcessException {
+ if (isFirstIteration) {
+ if (tvList.size() == 0
+ && LayerCacheUtils.cachePoint(
+ parentLayerPointReaderDataType, parentLayerPointReader,
tvList)
+ && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+ // display window begin should be set to the same as the min
timestamp of the query
+ // result
+ // set
+ nextWindowTimeBegin = tvList.getTime(0);
+ }
+ hasAtLeastOneRow = tvList.size() != 0;
+ isFirstIteration = false;
+ }
if (hasCached) {
return true;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index 51c2a330c8..d22bf0a091 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -275,12 +275,14 @@ public class
SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
@Override
public YieldableState yield() throws IOException, QueryProcessException {
if (isFirstIteration) {
- if (tvList.size() == 0 && nextWindowTimeBegin == Long.MIN_VALUE) {
+ if (tvList.size() == 0) {
final YieldableState yieldableState =
LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader,
tvList);
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
+ }
+ if (nextWindowTimeBegin == Long.MIN_VALUE) {
// display window begin should be set to the same as the min
timestamp of the query
// result set
nextWindowTimeBegin = tvList.getTime(0);