This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3008429c95 [IOTDB-3629] Fix TimeJoinOperator may cause Source handle 
is blocked exception (#6469)
3008429c95 is described below

commit 3008429c95f8ff8f2a77c8052d9aa055bf9c5437
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 28 08:50:35 2022 +0800

    [IOTDB-3629] Fix TimeJoinOperator may cause Source handle is blocked 
exception (#6469)
---
 .../iotdb/db/mpp/execution/driver/DataDriver.java  |  2 +-
 .../iotdb/db/mpp/execution/driver/Driver.java      | 22 ++++++++++------------
 .../iotdb/db/mpp/execution/driver/IDriver.java     |  2 +-
 .../db/mpp/execution/driver/SchemaDriver.java      |  2 +-
 .../db/mpp/execution/exchange/ISinkHandle.java     |  2 +-
 .../db/mpp/execution/exchange/ISourceHandle.java   |  2 +-
 .../db/mpp/execution/exchange/LocalSinkHandle.java |  2 +-
 .../mpp/execution/exchange/LocalSourceHandle.java  |  2 +-
 .../db/mpp/execution/exchange/SinkHandle.java      |  2 +-
 .../db/mpp/execution/exchange/SourceHandle.java    |  2 +-
 .../iotdb/db/mpp/execution/operator/Operator.java  |  4 ++--
 .../operator/process/AggregationOperator.java      |  4 ++--
 .../operator/process/DeviceMergeOperator.java      | 12 ++++++++----
 .../operator/process/DeviceViewOperator.java       |  4 ++--
 .../execution/operator/process/FillOperator.java   |  2 +-
 .../operator/process/LastQueryMergeOperator.java   |  2 +-
 .../execution/operator/process/LimitOperator.java  |  2 +-
 .../operator/process/LinearFillOperator.java       |  2 +-
 .../execution/operator/process/OffsetOperator.java |  2 +-
 .../process/RawDataAggregationOperator.java        |  2 +-
 .../process/SlidingWindowAggregationOperator.java  |  2 +-
 .../execution/operator/process/SortOperator.java   |  2 +-
 .../operator/process/TimeJoinOperator.java         | 11 +++++++----
 .../operator/process/TransformOperator.java        |  2 +-
 .../operator/process/UpdateLastCacheOperator.java  |  2 +-
 .../operator/schema/CountMergeOperator.java        | 11 +++++++----
 .../schema/NodeManageMemoryMergeOperator.java      |  2 +-
 .../operator/schema/NodePathsConvertOperator.java  |  2 +-
 .../operator/schema/NodePathsCountOperator.java    |  4 ++--
 .../operator/schema/SchemaFetchMergeOperator.java  |  2 +-
 .../operator/schema/SchemaQueryMergeOperator.java  |  2 +-
 .../schema/SchemaQueryOrderByHeatOperator.java     |  4 ++--
 .../operator/source/ExchangeOperator.java          |  4 ++--
 .../mpp/execution/schedule/DriverTaskThread.java   |  2 +-
 .../db/mpp/execution/schedule/task/DriverTask.java |  2 +-
 .../db/mpp/plan/execution/QueryExecution.java      |  2 +-
 .../plan/execution/memory/MemorySourceHandle.java  |  2 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  2 +-
 .../execution/exchange/LocalSinkHandleTest.java    |  2 +-
 .../execution/exchange/LocalSourceHandleTest.java  |  4 ++--
 .../db/mpp/execution/exchange/StubSinkHandle.java  |  2 +-
 .../execution/memory/MemorySourceHandleTest.java   |  2 +-
 .../DriverTaskTimeoutSentinelThreadTest.java       | 11 ++++++-----
 43 files changed, 83 insertions(+), 74 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 18f55110d8..ba6909e4fa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -60,7 +60,7 @@ public class DataDriver extends Driver {
   }
 
   @Override
-  protected boolean init(SettableFuture<Void> blockedFuture) {
+  protected boolean init(SettableFuture<?> blockedFuture) {
     if (!init) {
       try {
         initialize();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index e01e2aefd4..acf8e89470 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -54,8 +54,7 @@ public abstract class Driver implements IDriver {
   protected final Operator root;
   protected final ISinkHandle sinkHandle;
   protected final DriverContext driverContext;
-  protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture =
-      new AtomicReference<>();
+  protected final AtomicReference<SettableFuture<?>> driverBlockedFuture = new 
AtomicReference<>();
   protected final AtomicReference<State> state = new 
AtomicReference<>(State.ALIVE);
 
   protected final DriverLock exclusiveLock = new DriverLock();
@@ -93,15 +92,15 @@ public abstract class Driver implements IDriver {
    *
    * @return true if init succeed, false otherwise
    */
-  protected abstract boolean init(SettableFuture<Void> blockedFuture);
+  protected abstract boolean init(SettableFuture<?> blockedFuture);
 
   /** release resource this driver used */
   protected abstract void releaseResource();
 
   @Override
-  public ListenableFuture<Void> processFor(Duration duration) {
+  public ListenableFuture<?> processFor(Duration duration) {
 
-    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+    SettableFuture<?> blockedFuture = driverBlockedFuture.get();
     // initialization may be time-consuming, so we keep it in the processFor 
method
     // in normal case, it won't cause deadlock and should finish soon, 
otherwise it will be a
     // critical bug
@@ -116,7 +115,7 @@ public abstract class Driver implements IDriver {
 
     long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
 
-    Optional<ListenableFuture<Void>> result =
+    Optional<ListenableFuture<?>> result =
         tryWithLock(
             100,
             TimeUnit.MILLISECONDS,
@@ -124,7 +123,7 @@ public abstract class Driver implements IDriver {
             () -> {
               long start = System.nanoTime();
               do {
-                ListenableFuture<Void> future = processInternal();
+                ListenableFuture<?> future = processInternal();
                 if (!future.isDone()) {
                   return updateDriverBlockedFuture(future);
                 }
@@ -174,9 +173,9 @@ public abstract class Driver implements IDriver {
     return finished;
   }
 
-  private ListenableFuture<Void> processInternal() {
+  private ListenableFuture<?> processInternal() {
     try {
-      ListenableFuture<Void> blocked = root.isBlocked();
+      ListenableFuture<?> blocked = root.isBlocked();
       if (!blocked.isDone()) {
         return blocked;
       }
@@ -211,11 +210,10 @@ public abstract class Driver implements IDriver {
     }
   }
 
-  private ListenableFuture<Void> updateDriverBlockedFuture(
-      ListenableFuture<Void> sourceBlockedFuture) {
+  private ListenableFuture<?> updateDriverBlockedFuture(ListenableFuture<?> 
sourceBlockedFuture) {
     // driverBlockedFuture will be completed as soon as the 
sourceBlockedFuture is completed
     // or any of the operators gets a memory revocation request
-    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+    SettableFuture<?> newDriverBlockedFuture = SettableFuture.create();
     driverBlockedFuture.set(newDriverBlockedFuture);
     sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), 
directExecutor());
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 3111a22c49..0d097bfc7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -47,7 +47,7 @@ public interface IDriver {
    *     processing. Otherwise, meaning that this IDriver is blocked and not 
ready for next
    *     processing.
    */
-  ListenableFuture<Void> processFor(Duration duration);
+  ListenableFuture<?> processFor(Duration duration);
 
   /**
    * the id information about this IDriver.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 5c52aa1d20..01ed2315b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -34,7 +34,7 @@ public class SchemaDriver extends Driver {
   }
 
   @Override
-  protected boolean init(SettableFuture<Void> blockedFuture) {
+  protected boolean init(SettableFuture<?> blockedFuture) {
     return true;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 27ac4cbafb..b4be67e164 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -34,7 +34,7 @@ public interface ISinkHandle {
   long getBufferRetainedSizeInBytes();
 
   /** Get a future that will be completed when the output buffer is not full. 
*/
-  ListenableFuture<Void> isFull();
+  ListenableFuture<?> isFull();
 
   /**
    * Send a list of tsblocks to an unpartitioned output buffer. If 
no-more-tsblocks has been set,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
index 376de3cf5d..be62bff4ff 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
@@ -44,7 +44,7 @@ public interface ISourceHandle {
   boolean isFinished();
 
   /** Get a future that will be completed when the input buffer is not empty. 
*/
-  ListenableFuture<Void> isBlocked();
+  ListenableFuture<?> isBlocked();
 
   /** If this handle is aborted. */
   boolean isAborted();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index c7b4b982ad..852e79ac8b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -71,7 +71,7 @@ public class LocalSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public synchronized ListenableFuture<Void> isFull() {
+  public synchronized ListenableFuture<?> isFull() {
     if (aborted) {
       throw new IllegalStateException("Sink handle is closed.");
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 8f47f9d852..c101e7b644 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -122,7 +122,7 @@ public class LocalSourceHandle implements ISourceHandle {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     if (aborted) {
       throw new IllegalStateException("Source handle is closed.");
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 71fe329ed4..3619de44f4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -109,7 +109,7 @@ public class SinkHandle implements ISinkHandle {
   }
 
   @Override
-  public synchronized ListenableFuture<Void> isFull() {
+  public synchronized ListenableFuture<?> isFull() {
     if (aborted) {
       throw new IllegalStateException("Sink handle is aborted.");
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index a19d080d07..facd0c1e0f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -201,7 +201,7 @@ public class SourceHandle implements ISourceHandle {
   }
 
   @Override
-  public synchronized ListenableFuture<Void> isBlocked() {
+  public synchronized ListenableFuture<?> isBlocked() {
     if (aborted) {
       throw new IllegalStateException("Source handle is aborted.");
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index 03c12262dd..dfa08e033f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -26,7 +26,7 @@ import static 
com.google.common.util.concurrent.Futures.immediateVoidFuture;
 
 public interface Operator extends AutoCloseable {
 
-  ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
+  ListenableFuture<?> NOT_BLOCKED = immediateVoidFuture();
 
   OperatorContext getOperatorContext();
 
@@ -34,7 +34,7 @@ public interface Operator extends AutoCloseable {
    * Returns a future that will be completed when the operator becomes 
unblocked. If the operator is
    * not blocked, this method should return {@code NOT_BLOCKED}.
    */
-  default ListenableFuture<Void> isBlocked() {
+  default ListenableFuture<?> isBlocked() {
     return NOT_BLOCKED;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 4dd1d9c411..e4de130719 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -85,9 +85,9 @@ public class AggregationOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     for (int i = 0; i < inputOperatorsCount; i++) {
-      ListenableFuture<Void> blocked = children.get(i).isBlocked();
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
       if (!blocked.isDone()) {
         return blocked;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 3e4eacb4a7..d16d47aa28 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -32,9 +32,12 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 /**
  * DeviceMergeOperator is responsible for merging tsBlock coming from 
DeviceViewOperators.
  *
@@ -94,16 +97,17 @@ public class DeviceMergeOperator implements ProcessOperator 
{
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
-        ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked();
+        ListenableFuture<?> blocked = deviceOperators.get(i).isBlocked();
         if (!blocked.isDone()) {
-          return blocked;
+          listenableFutures.add(blocked);
         }
       }
     }
-    return NOT_BLOCKED;
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index a95bee8cf1..127a26a8b6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -94,8 +94,8 @@ public class DeviceViewOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
-    ListenableFuture<Void> blocked = getCurDeviceOperator().isBlocked();
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
     if (!blocked.isDone()) {
       return blocked;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index f853d5d9cd..da7ffe6ab1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -52,7 +52,7 @@ public class FillOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
index 70ccf2134f..91aeecec02 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -50,7 +50,7 @@ public class LastQueryMergeOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     if (currentIndex < inputOperatorsCount) {
       return children.get(currentIndex).isBlocked();
     } else {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 4ae909a2a8..ef3870fb17 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -46,7 +46,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index dbbe1b638a..8193e20684 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -72,7 +72,7 @@ public class LinearFillOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index b2136a5b4a..2820992745 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -46,7 +46,7 @@ public class OffsetOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 2d355d8775..01b0625c60 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -89,7 +89,7 @@ public class RawDataAggregationOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 93bfa854bb..2685e3e465 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -152,7 +152,7 @@ public class SlidingWindowAggregationOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index e3c297fe47..38bda24bf7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -31,7 +31,7 @@ public class SortOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return ProcessOperator.super.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
index 219342fe5b..5a576ea5cc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
@@ -31,9 +31,11 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
 
 public class TimeJoinOperator implements ProcessOperator {
 
@@ -109,16 +111,17 @@ public class TimeJoinOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (!noMoreTsBlocks[i] && empty(i)) {
-        ListenableFuture<Void> blocked = children.get(i).isBlocked();
+        ListenableFuture<?> blocked = children.get(i).isBlocked();
         if (!blocked.isDone()) {
-          return blocked;
+          listenableFutures.add(blocked);
         }
       }
     }
-    return NOT_BLOCKED;
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index fc24eac43d..6ed6ec9ca1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -334,7 +334,7 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return inputOperator.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index f2ec8c0cc8..1bfebb251c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -80,7 +80,7 @@ public class UpdateLastCacheOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 5af5087bbe..a9176d658c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -36,6 +36,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 public class CountMergeOperator implements ProcessOperator {
   private final PlanNodeId planNodeId;
   private final OperatorContext operatorContext;
@@ -56,14 +58,15 @@ public class CountMergeOperator implements ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (Operator child : children) {
-      ListenableFuture<Void> blocked = child.isBlocked();
+      ListenableFuture<?> blocked = child.isBlocked();
       if (!blocked.isDone()) {
-        return blocked;
+        listenableFutures.add(blocked);
       }
     }
-    return NOT_BLOCKED;
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index e6e15b2e3f..22c1d61d92 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -52,7 +52,7 @@ public class NodeManageMemoryMergeOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index bb1b50228d..e6bfa76e23 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -53,7 +53,7 @@ public class NodePathsConvertOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return child.isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 3c0614589d..ca8f8ef5da 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -70,8 +70,8 @@ public class NodePathsCountOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
-    ListenableFuture<Void> blocked = child.isBlocked();
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> blocked = child.isBlocked();
     while (child.hasNext() && blocked.isDone()) {
       TsBlock tsBlock = child.next();
       if (null != tsBlock && !tsBlock.isEmpty()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 4c862fee6e..57ea220b63 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -64,7 +64,7 @@ public class SchemaFetchMergeOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return children.get(currentIndex).isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index d594b38bd8..87df583494 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -65,7 +65,7 @@ public class SchemaQueryMergeOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return children.get(currentIndex).isBlocked();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 22b7156f28..7bb06a3114 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -116,11 +116,11 @@ public class SchemaQueryOrderByHeatOperator implements 
ProcessOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     for (int i = 0; i < operators.size(); i++) {
       if (!noMoreTsBlocks[i]) {
         Operator operator = operators.get(i);
-        ListenableFuture<Void> blocked = operator.isBlocked();
+        ListenableFuture<?> blocked = operator.isBlocked();
         while (operator.hasNext() && blocked.isDone()) {
           TsBlock tsBlock = operator.next();
           if (null != tsBlock && !tsBlock.isEmpty()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 6534cffb76..3579a642bf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -33,7 +33,7 @@ public class ExchangeOperator implements SourceOperator {
 
   private final PlanNodeId sourceId;
 
-  private ListenableFuture<Void> isBlocked = NOT_BLOCKED;
+  private ListenableFuture<?> isBlocked = NOT_BLOCKED;
 
   public ExchangeOperator(
       OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId 
sourceId) {
@@ -68,7 +68,7 @@ public class ExchangeOperator implements SourceOperator {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     // Avoid registering a new callback in the source handle when one is 
already pending
     if (isBlocked.isDone()) {
       isBlocked = sourceHandle.isBlocked();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index b013377a13..02a795cff3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -55,7 +55,7 @@ public class DriverTaskThread extends AbstractDriverThread {
     }
     IDriver instance = task.getFragmentInstance();
     CpuTimer timer = new CpuTimer();
-    ListenableFuture<Void> future = instance.processFor(EXECUTION_TIME_SLICE);
+    ListenableFuture<?> future = instance.processFor(EXECUTION_TIME_SLICE);
     CpuTimer.CpuDuration duration = timer.elapsedTime();
     // long cost = System.nanoTime() - startTime;
     // If the future is cancelled, the task is in an error and should be 
thrown.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 418e448759..f65c1f3988 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -194,7 +194,7 @@ public class DriverTask implements IDIndexedAccessible {
     }
 
     @Override
-    public ListenableFuture<Void> processFor(Duration duration) {
+    public ListenableFuture<?> processFor(Duration duration) {
       return null;
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index f6a775c150..7a9ab53fa4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -289,7 +289,7 @@ public class QueryExecution implements IQueryExecution {
         stateMachine.transitionToFinished();
         return Optional.empty();
       }
-      ListenableFuture<Void> blocked = resultHandle.isBlocked();
+      ListenableFuture<?> blocked = resultHandle.isBlocked();
       blocked.get();
       if (!resultHandle.isFinished()) {
         return Optional.of(resultHandle.receive());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index a6d7030a30..588407eae1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -66,7 +66,7 @@ public class MemorySourceHandle implements ISourceHandle {
   }
 
   @Override
-  public ListenableFuture<Void> isBlocked() {
+  public ListenableFuture<?> isBlocked() {
     return immediateFuture(null);
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index feec870147..0d19e99889 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -186,7 +186,7 @@ public class DataDriverTest {
 
         while (!dataDriver.isFinished()) {
           assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState());
-          ListenableFuture<Void> blocked = 
dataDriver.processFor(EXECUTION_TIME_SLICE);
+          ListenableFuture<?> blocked = 
dataDriver.processFor(EXECUTION_TIME_SLICE);
           assertTrue(blocked.isDone());
         }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 4c9d2da468..422139a959 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -137,7 +137,7 @@ public class LocalSinkHandleTest {
       numOfSentTsblocks += 1;
     }
     Assert.assertEquals(6, numOfSentTsblocks);
-    ListenableFuture<Void> blocked = localSinkHandle.isFull();
+    ListenableFuture<?> blocked = localSinkHandle.isFull();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(localSinkHandle.isFinished());
     Assert.assertEquals(6 * mockTsBlockSize, 
localSinkHandle.getBufferRetainedSizeInBytes());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index 1c3d6d5747..e1f49f5c77 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -71,7 +71,7 @@ public class LocalSourceHandleTest {
     // Consume tsblocks.
     Assert.assertTrue(localSourceHandle.isBlocked().isDone());
     localSourceHandle.receive();
-    ListenableFuture<Void> blocked = localSourceHandle.isBlocked();
+    ListenableFuture<?> blocked = localSourceHandle.isBlocked();
     Assert.assertTrue(blocked.isDone());
     Assert.assertFalse(localSourceHandle.isAborted());
     Assert.assertTrue(localSourceHandle.isFinished());
@@ -103,7 +103,7 @@ public class LocalSourceHandleTest {
             localPlanNodeId,
             queue,
             mockSourceHandleListener);
-    ListenableFuture<Void> future = localSourceHandle.isBlocked();
+    ListenableFuture<?> future = localSourceHandle.isBlocked();
     Assert.assertFalse(future.isDone());
     Assert.assertFalse(localSourceHandle.isAborted());
     Assert.assertFalse(localSourceHandle.isFinished());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index b6e332168b..5e29f47066 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -54,7 +54,7 @@ public class StubSinkHandle implements ISinkHandle {
   }
 
   @Override
-  public ListenableFuture<Void> isFull() {
+  public ListenableFuture<?> isFull() {
     return NOT_BLOCKED;
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
index fd91eb8e6b..7696e8d6f7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
@@ -33,7 +33,7 @@ public class MemorySourceHandleTest {
     TsBlock rawResult = new TsBlock(0);
     MemorySourceHandle sourceHandle = new MemorySourceHandle(rawResult);
     Assert.assertFalse(sourceHandle.isFinished());
-    ListenableFuture<Void> blocked = sourceHandle.isBlocked();
+    ListenableFuture<?> blocked = sourceHandle.isBlocked();
     Assert.assertTrue(blocked.isDone());
     TsBlock result = sourceHandle.receive();
     Assert.assertEquals(rawResult, result);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index 6f49600921..d8c4a4ddf7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -157,7 +157,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, 
"inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    
Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(Futures.immediateVoidFuture());
+    Mockito.when(mockDriver.processFor(Mockito.any()))
+        .thenAnswer(ans -> Futures.immediateVoidFuture());
     Mockito.when(mockDriver.isFinished()).thenReturn(true);
     AbstractDriverThread executor =
         new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, 
mockScheduler);
@@ -189,7 +190,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
         new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new 
DriverTask());
 
     // Mock the instance with a blocked future
-    ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
+    ListenableFuture<?> mockFuture = Mockito.mock(ListenableFuture.class);
     Mockito.when(mockFuture.isDone()).thenReturn(false);
     Mockito.doAnswer(
             ans -> {
@@ -205,7 +206,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, 
"inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+    Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans -> 
mockFuture);
     Mockito.when(mockDriver.isFinished()).thenReturn(false);
     AbstractDriverThread executor =
         new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, 
mockScheduler);
@@ -237,7 +238,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
         new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new 
DriverTask());
 
     // Mock the instance with a ready future
-    ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
+    ListenableFuture<?> mockFuture = Mockito.mock(ListenableFuture.class);
     Mockito.when(mockFuture.isDone()).thenReturn(true);
     Mockito.doAnswer(
             ans -> {
@@ -253,7 +254,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
     PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
     FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId, 
"inst-0");
     Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-    Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+    Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans -> 
mockFuture);
     Mockito.when(mockDriver.isFinished()).thenReturn(false);
     AbstractDriverThread executor =
         new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue, 
mockScheduler);

Reply via email to