This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch iotdb-3629
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/iotdb-3629 by this push:
new f5c213e190 [iotdb-3629] Fix TimeJoinOperator may cause Source handle
is blocked exception
f5c213e190 is described below
commit f5c213e1900eaa677d8d347601f4f5fd0f3aba4f
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jun 27 22:11:11 2022 +0800
[iotdb-3629] Fix TimeJoinOperator may cause Source handle is blocked
exception
---
.../db/mpp/execution/operator/process/AggregationOperator.java | 2 +-
.../execution/operator/schema/SchemaQueryOrderByHeatOperator.java | 2 +-
.../execution/schedule/DriverTaskTimeoutSentinelThreadTest.java | 7 ++++---
3 files changed, 6 insertions(+), 5 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 36d3495f30..e4de130719 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -87,7 +87,7 @@ public class AggregationOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
for (int i = 0; i < inputOperatorsCount; i++) {
- ListenableFuture<Void> blocked = children.get(i).isBlocked();
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
if (!blocked.isDone()) {
return blocked;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index b29c3805ae..7bb06a3114 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -120,7 +120,7 @@ public class SchemaQueryOrderByHeatOperator implements
ProcessOperator {
for (int i = 0; i < operators.size(); i++) {
if (!noMoreTsBlocks[i]) {
Operator operator = operators.get(i);
- ListenableFuture<Void> blocked = operator.isBlocked();
+ ListenableFuture<?> blocked = operator.isBlocked();
while (operator.hasNext() && blocked.isDone()) {
TsBlock tsBlock = operator.next();
if (null != tsBlock && !tsBlock.isEmpty()) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index 6bb9929d65..d8c4a4ddf7 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -157,7 +157,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId,
"inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-
Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(Futures.immediateVoidFuture());
+ Mockito.when(mockDriver.processFor(Mockito.any()))
+ .thenAnswer(ans -> Futures.immediateVoidFuture());
Mockito.when(mockDriver.isFinished()).thenReturn(true);
AbstractDriverThread executor =
new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue,
mockScheduler);
@@ -205,7 +206,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId,
"inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+ Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans ->
mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
AbstractDriverThread executor =
new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue,
mockScheduler);
@@ -253,7 +254,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId,
"inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+ Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans ->
mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
AbstractDriverThread executor =
new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue,
mockScheduler);