This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch iotdb-3629
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/iotdb-3629 by this push:
     new f5c213e190 [iotdb-3629] Fix TimeJoinOperator may cause Source handle 
is blocked exception
f5c213e190 is described below

commit f5c213e1900eaa677d8d347601f4f5fd0f3aba4f
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jun 27 22:11:11 2022 +0800

    [iotdb-3629] Fix TimeJoinOperator may cause Source handle is blocked 
exception
---
 .../db/mpp/execution/operator/process/AggregationOperator.java     | 2 +-
 .../execution/operator/schema/SchemaQueryOrderByHeatOperator.java  | 2 +-
 .../execution/schedule/DriverTaskTimeoutSentinelThreadTest.java    | 7 ++++---
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 36d3495f30..e4de130719 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -87,7 +87,7 @@ public class AggregationOperator implements ProcessOperator {
   @Override
   public ListenableFuture<?> isBlocked() {
     for (int i = 0; i < inputOperatorsCount; i++) {
-      ListenableFuture<Void> blocked = children.get(i).isBlocked();
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
       if (!blocked.isDone()) {
         return blocked;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index b29c3805ae..7bb06a3114 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -120,7 +120,7 @@ public class SchemaQueryOrderByHeatOperator implements 
ProcessOperator {
     for (int i = 0; i < operators.size(); i++) {
       if (!noMoreTsBlocks[i]) {
         Operator operator = operators.get(i);
-        ListenableFuture<Void> blocked = operator.isBlocked();
+        ListenableFuture<?> blocked = operator.isBlocked();
         while (operator.hasNext() && blocked.isDone()) {
           TsBlock tsBlock = operator.next();
           if (null != tsBlock && !tsBlock.isEmpty()) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index 6bb9929d65..d8c4a4ddf7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -157,7 +157,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, 
"inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    
Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(Futures.immediateVoidFuture());
+    Mockito.when(mockDriver.processFor(Mockito.any()))
+        .thenAnswer(ans -> Futures.immediateVoidFuture());
     Mockito.when(mockDriver.isFinished()).thenReturn(true);
     AbstractDriverThread executor =
         new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, 
mockScheduler);
@@ -205,7 +206,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, 
"inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+    Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans -> 
mockFuture);
     Mockito.when(mockDriver.isFinished()).thenReturn(false);
     AbstractDriverThread executor =
         new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, 
mockScheduler);
@@ -253,7 +254,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, 
"inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+    Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans -> 
mockFuture);
     Mockito.when(mockDriver.isFinished()).thenReturn(false);
     AbstractDriverThread executor =
         new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, 
mockScheduler);

Reply via email to