This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3008429c95 [IOTDB-3629] Fix TimeJoinOperator may cause Source handle
is blocked exception (#6469)
3008429c95 is described below
commit 3008429c95f8ff8f2a77c8052d9aa055bf9c5437
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 28 08:50:35 2022 +0800
[IOTDB-3629] Fix TimeJoinOperator may cause Source handle is blocked
exception (#6469)
---
.../iotdb/db/mpp/execution/driver/DataDriver.java | 2 +-
.../iotdb/db/mpp/execution/driver/Driver.java | 22 ++++++++++------------
.../iotdb/db/mpp/execution/driver/IDriver.java | 2 +-
.../db/mpp/execution/driver/SchemaDriver.java | 2 +-
.../db/mpp/execution/exchange/ISinkHandle.java | 2 +-
.../db/mpp/execution/exchange/ISourceHandle.java | 2 +-
.../db/mpp/execution/exchange/LocalSinkHandle.java | 2 +-
.../mpp/execution/exchange/LocalSourceHandle.java | 2 +-
.../db/mpp/execution/exchange/SinkHandle.java | 2 +-
.../db/mpp/execution/exchange/SourceHandle.java | 2 +-
.../iotdb/db/mpp/execution/operator/Operator.java | 4 ++--
.../operator/process/AggregationOperator.java | 4 ++--
.../operator/process/DeviceMergeOperator.java | 12 ++++++++----
.../operator/process/DeviceViewOperator.java | 4 ++--
.../execution/operator/process/FillOperator.java | 2 +-
.../operator/process/LastQueryMergeOperator.java | 2 +-
.../execution/operator/process/LimitOperator.java | 2 +-
.../operator/process/LinearFillOperator.java | 2 +-
.../execution/operator/process/OffsetOperator.java | 2 +-
.../process/RawDataAggregationOperator.java | 2 +-
.../process/SlidingWindowAggregationOperator.java | 2 +-
.../execution/operator/process/SortOperator.java | 2 +-
.../operator/process/TimeJoinOperator.java | 11 +++++++----
.../operator/process/TransformOperator.java | 2 +-
.../operator/process/UpdateLastCacheOperator.java | 2 +-
.../operator/schema/CountMergeOperator.java | 11 +++++++----
.../schema/NodeManageMemoryMergeOperator.java | 2 +-
.../operator/schema/NodePathsConvertOperator.java | 2 +-
.../operator/schema/NodePathsCountOperator.java | 4 ++--
.../operator/schema/SchemaFetchMergeOperator.java | 2 +-
.../operator/schema/SchemaQueryMergeOperator.java | 2 +-
.../schema/SchemaQueryOrderByHeatOperator.java | 4 ++--
.../operator/source/ExchangeOperator.java | 4 ++--
.../mpp/execution/schedule/DriverTaskThread.java | 2 +-
.../db/mpp/execution/schedule/task/DriverTask.java | 2 +-
.../db/mpp/plan/execution/QueryExecution.java | 2 +-
.../plan/execution/memory/MemorySourceHandle.java | 2 +-
.../iotdb/db/mpp/execution/DataDriverTest.java | 2 +-
.../execution/exchange/LocalSinkHandleTest.java | 2 +-
.../execution/exchange/LocalSourceHandleTest.java | 4 ++--
.../db/mpp/execution/exchange/StubSinkHandle.java | 2 +-
.../execution/memory/MemorySourceHandleTest.java | 2 +-
.../DriverTaskTimeoutSentinelThreadTest.java | 11 ++++++-----
43 files changed, 83 insertions(+), 74 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index 18f55110d8..ba6909e4fa 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -60,7 +60,7 @@ public class DataDriver extends Driver {
}
@Override
- protected boolean init(SettableFuture<Void> blockedFuture) {
+ protected boolean init(SettableFuture<?> blockedFuture) {
if (!init) {
try {
initialize();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index e01e2aefd4..acf8e89470 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -54,8 +54,7 @@ public abstract class Driver implements IDriver {
protected final Operator root;
protected final ISinkHandle sinkHandle;
protected final DriverContext driverContext;
- protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture =
- new AtomicReference<>();
+ protected final AtomicReference<SettableFuture<?>> driverBlockedFuture = new
AtomicReference<>();
protected final AtomicReference<State> state = new
AtomicReference<>(State.ALIVE);
protected final DriverLock exclusiveLock = new DriverLock();
@@ -93,15 +92,15 @@ public abstract class Driver implements IDriver {
*
* @return true if init succeed, false otherwise
*/
- protected abstract boolean init(SettableFuture<Void> blockedFuture);
+ protected abstract boolean init(SettableFuture<?> blockedFuture);
/** release resource this driver used */
protected abstract void releaseResource();
@Override
- public ListenableFuture<Void> processFor(Duration duration) {
+ public ListenableFuture<?> processFor(Duration duration) {
- SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+ SettableFuture<?> blockedFuture = driverBlockedFuture.get();
// initialization may be time-consuming, so we keep it in the processFor
method
// in normal case, it won't cause deadlock and should finish soon,
otherwise it will be a
// critical bug
@@ -116,7 +115,7 @@ public abstract class Driver implements IDriver {
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
- Optional<ListenableFuture<Void>> result =
+ Optional<ListenableFuture<?>> result =
tryWithLock(
100,
TimeUnit.MILLISECONDS,
@@ -124,7 +123,7 @@ public abstract class Driver implements IDriver {
() -> {
long start = System.nanoTime();
do {
- ListenableFuture<Void> future = processInternal();
+ ListenableFuture<?> future = processInternal();
if (!future.isDone()) {
return updateDriverBlockedFuture(future);
}
@@ -174,9 +173,9 @@ public abstract class Driver implements IDriver {
return finished;
}
- private ListenableFuture<Void> processInternal() {
+ private ListenableFuture<?> processInternal() {
try {
- ListenableFuture<Void> blocked = root.isBlocked();
+ ListenableFuture<?> blocked = root.isBlocked();
if (!blocked.isDone()) {
return blocked;
}
@@ -211,11 +210,10 @@ public abstract class Driver implements IDriver {
}
}
- private ListenableFuture<Void> updateDriverBlockedFuture(
- ListenableFuture<Void> sourceBlockedFuture) {
+ private ListenableFuture<?> updateDriverBlockedFuture(ListenableFuture<?>
sourceBlockedFuture) {
// driverBlockedFuture will be completed as soon as the
sourceBlockedFuture is completed
// or any of the operators gets a memory revocation request
- SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+ SettableFuture<?> newDriverBlockedFuture = SettableFuture.create();
driverBlockedFuture.set(newDriverBlockedFuture);
sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null),
directExecutor());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 3111a22c49..0d097bfc7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -47,7 +47,7 @@ public interface IDriver {
* processing. Otherwise, meaning that this IDriver is blocked and not
ready for next
* processing.
*/
- ListenableFuture<Void> processFor(Duration duration);
+ ListenableFuture<?> processFor(Duration duration);
/**
* the id information about this IDriver.
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 5c52aa1d20..01ed2315b8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -34,7 +34,7 @@ public class SchemaDriver extends Driver {
}
@Override
- protected boolean init(SettableFuture<Void> blockedFuture) {
+ protected boolean init(SettableFuture<?> blockedFuture) {
return true;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 27ac4cbafb..b4be67e164 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -34,7 +34,7 @@ public interface ISinkHandle {
long getBufferRetainedSizeInBytes();
/** Get a future that will be completed when the output buffer is not full.
*/
- ListenableFuture<Void> isFull();
+ ListenableFuture<?> isFull();
/**
* Send a list of tsblocks to an unpartitioned output buffer. If
no-more-tsblocks has been set,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
index 376de3cf5d..be62bff4ff 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java
@@ -44,7 +44,7 @@ public interface ISourceHandle {
boolean isFinished();
/** Get a future that will be completed when the input buffer is not empty.
*/
- ListenableFuture<Void> isBlocked();
+ ListenableFuture<?> isBlocked();
/** If this handle is aborted. */
boolean isAborted();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index c7b4b982ad..852e79ac8b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -71,7 +71,7 @@ public class LocalSinkHandle implements ISinkHandle {
}
@Override
- public synchronized ListenableFuture<Void> isFull() {
+ public synchronized ListenableFuture<?> isFull() {
if (aborted) {
throw new IllegalStateException("Sink handle is closed.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 8f47f9d852..c101e7b644 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -122,7 +122,7 @@ public class LocalSourceHandle implements ISourceHandle {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
if (aborted) {
throw new IllegalStateException("Source handle is closed.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 71fe329ed4..3619de44f4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -109,7 +109,7 @@ public class SinkHandle implements ISinkHandle {
}
@Override
- public synchronized ListenableFuture<Void> isFull() {
+ public synchronized ListenableFuture<?> isFull() {
if (aborted) {
throw new IllegalStateException("Sink handle is aborted.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index a19d080d07..facd0c1e0f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -201,7 +201,7 @@ public class SourceHandle implements ISourceHandle {
}
@Override
- public synchronized ListenableFuture<Void> isBlocked() {
+ public synchronized ListenableFuture<?> isBlocked() {
if (aborted) {
throw new IllegalStateException("Source handle is aborted.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
index 03c12262dd..dfa08e033f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java
@@ -26,7 +26,7 @@ import static
com.google.common.util.concurrent.Futures.immediateVoidFuture;
public interface Operator extends AutoCloseable {
- ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
+ ListenableFuture<?> NOT_BLOCKED = immediateVoidFuture();
OperatorContext getOperatorContext();
@@ -34,7 +34,7 @@ public interface Operator extends AutoCloseable {
* Returns a future that will be completed when the operator becomes
unblocked. If the operator is
* not blocked, this method should return {@code NOT_BLOCKED}.
*/
- default ListenableFuture<Void> isBlocked() {
+ default ListenableFuture<?> isBlocked() {
return NOT_BLOCKED;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 4dd1d9c411..e4de130719 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -85,9 +85,9 @@ public class AggregationOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
for (int i = 0; i < inputOperatorsCount; i++) {
- ListenableFuture<Void> blocked = children.get(i).isBlocked();
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
if (!blocked.isDone()) {
return blocked;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 3e4eacb4a7..d16d47aa28 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -32,9 +32,12 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
/**
* DeviceMergeOperator is responsible for merging tsBlock coming from
DeviceViewOperators.
*
@@ -94,16 +97,17 @@ public class DeviceMergeOperator implements ProcessOperator
{
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
- ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked();
+ ListenableFuture<?> blocked = deviceOperators.get(i).isBlocked();
if (!blocked.isDone()) {
- return blocked;
+ listenableFutures.add(blocked);
}
}
}
- return NOT_BLOCKED;
+ return listenableFutures.isEmpty() ? NOT_BLOCKED :
successfulAsList(listenableFutures);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index a95bee8cf1..127a26a8b6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -94,8 +94,8 @@ public class DeviceViewOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
- ListenableFuture<Void> blocked = getCurDeviceOperator().isBlocked();
+ public ListenableFuture<?> isBlocked() {
+ ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
if (!blocked.isDone()) {
return blocked;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index f853d5d9cd..da7ffe6ab1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -52,7 +52,7 @@ public class FillOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
index 70ccf2134f..91aeecec02 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -50,7 +50,7 @@ public class LastQueryMergeOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
if (currentIndex < inputOperatorsCount) {
return children.get(currentIndex).isBlocked();
} else {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 4ae909a2a8..ef3870fb17 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -46,7 +46,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index dbbe1b638a..8193e20684 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -72,7 +72,7 @@ public class LinearFillOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index b2136a5b4a..2820992745 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -46,7 +46,7 @@ public class OffsetOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 2d355d8775..01b0625c60 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -89,7 +89,7 @@ public class RawDataAggregationOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 93bfa854bb..2685e3e465 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -152,7 +152,7 @@ public class SlidingWindowAggregationOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index e3c297fe47..38bda24bf7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -31,7 +31,7 @@ public class SortOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return ProcessOperator.super.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
index 219342fe5b..5a576ea5cc 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
@@ -31,9 +31,11 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
public class TimeJoinOperator implements ProcessOperator {
@@ -109,16 +111,17 @@ public class TimeJoinOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && empty(i)) {
- ListenableFuture<Void> blocked = children.get(i).isBlocked();
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
if (!blocked.isDone()) {
- return blocked;
+ listenableFutures.add(blocked);
}
}
}
- return NOT_BLOCKED;
+ return listenableFutures.isEmpty() ? NOT_BLOCKED :
successfulAsList(listenableFutures);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index fc24eac43d..6ed6ec9ca1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -334,7 +334,7 @@ public class TransformOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return inputOperator.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
index f2ec8c0cc8..1bfebb251c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -80,7 +80,7 @@ public class UpdateLastCacheOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 5af5087bbe..a9176d658c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -36,6 +36,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
public class CountMergeOperator implements ProcessOperator {
private final PlanNodeId planNodeId;
private final OperatorContext operatorContext;
@@ -56,14 +58,15 @@ public class CountMergeOperator implements ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (Operator child : children) {
- ListenableFuture<Void> blocked = child.isBlocked();
+ ListenableFuture<?> blocked = child.isBlocked();
if (!blocked.isDone()) {
- return blocked;
+ listenableFutures.add(blocked);
}
}
- return NOT_BLOCKED;
+ return listenableFutures.isEmpty() ? NOT_BLOCKED :
successfulAsList(listenableFutures);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
index e6e15b2e3f..22c1d61d92 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java
@@ -52,7 +52,7 @@ public class NodeManageMemoryMergeOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
index bb1b50228d..e6bfa76e23 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java
@@ -53,7 +53,7 @@ public class NodePathsConvertOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return child.isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
index 3c0614589d..ca8f8ef5da 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java
@@ -70,8 +70,8 @@ public class NodePathsCountOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
- ListenableFuture<Void> blocked = child.isBlocked();
+ public ListenableFuture<?> isBlocked() {
+ ListenableFuture<?> blocked = child.isBlocked();
while (child.hasNext() && blocked.isDone()) {
TsBlock tsBlock = child.next();
if (null != tsBlock && !tsBlock.isEmpty()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index 4c862fee6e..57ea220b63 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -64,7 +64,7 @@ public class SchemaFetchMergeOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return children.get(currentIndex).isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
index d594b38bd8..87df583494 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java
@@ -65,7 +65,7 @@ public class SchemaQueryMergeOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return children.get(currentIndex).isBlocked();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
index 22b7156f28..7bb06a3114 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java
@@ -116,11 +116,11 @@ public class SchemaQueryOrderByHeatOperator implements
ProcessOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
for (int i = 0; i < operators.size(); i++) {
if (!noMoreTsBlocks[i]) {
Operator operator = operators.get(i);
- ListenableFuture<Void> blocked = operator.isBlocked();
+ ListenableFuture<?> blocked = operator.isBlocked();
while (operator.hasNext() && blocked.isDone()) {
TsBlock tsBlock = operator.next();
if (null != tsBlock && !tsBlock.isEmpty()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 6534cffb76..3579a642bf 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -33,7 +33,7 @@ public class ExchangeOperator implements SourceOperator {
private final PlanNodeId sourceId;
- private ListenableFuture<Void> isBlocked = NOT_BLOCKED;
+ private ListenableFuture<?> isBlocked = NOT_BLOCKED;
public ExchangeOperator(
OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId
sourceId) {
@@ -68,7 +68,7 @@ public class ExchangeOperator implements SourceOperator {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
// Avoid registering a new callback in the source handle when one is
already pending
if (isBlocked.isDone()) {
isBlocked = sourceHandle.isBlocked();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
index b013377a13..02a795cff3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java
@@ -55,7 +55,7 @@ public class DriverTaskThread extends AbstractDriverThread {
}
IDriver instance = task.getFragmentInstance();
CpuTimer timer = new CpuTimer();
- ListenableFuture<Void> future = instance.processFor(EXECUTION_TIME_SLICE);
+ ListenableFuture<?> future = instance.processFor(EXECUTION_TIME_SLICE);
CpuTimer.CpuDuration duration = timer.elapsedTime();
// long cost = System.nanoTime() - startTime;
// If the future is cancelled, the task is in an error and should be
thrown.
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
index 418e448759..f65c1f3988 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java
@@ -194,7 +194,7 @@ public class DriverTask implements IDIndexedAccessible {
}
@Override
- public ListenableFuture<Void> processFor(Duration duration) {
+ public ListenableFuture<?> processFor(Duration duration) {
return null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index f6a775c150..7a9ab53fa4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -289,7 +289,7 @@ public class QueryExecution implements IQueryExecution {
stateMachine.transitionToFinished();
return Optional.empty();
}
- ListenableFuture<Void> blocked = resultHandle.isBlocked();
+ ListenableFuture<?> blocked = resultHandle.isBlocked();
blocked.get();
if (!resultHandle.isFinished()) {
return Optional.of(resultHandle.receive());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index a6d7030a30..588407eae1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -66,7 +66,7 @@ public class MemorySourceHandle implements ISourceHandle {
}
@Override
- public ListenableFuture<Void> isBlocked() {
+ public ListenableFuture<?> isBlocked() {
return immediateFuture(null);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index feec870147..0d19e99889 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -186,7 +186,7 @@ public class DataDriverTest {
while (!dataDriver.isFinished()) {
assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState());
- ListenableFuture<Void> blocked =
dataDriver.processFor(EXECUTION_TIME_SLICE);
+ ListenableFuture<?> blocked =
dataDriver.processFor(EXECUTION_TIME_SLICE);
assertTrue(blocked.isDone());
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 4c9d2da468..422139a959 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -137,7 +137,7 @@ public class LocalSinkHandleTest {
numOfSentTsblocks += 1;
}
Assert.assertEquals(6, numOfSentTsblocks);
- ListenableFuture<Void> blocked = localSinkHandle.isFull();
+ ListenableFuture<?> blocked = localSinkHandle.isFull();
Assert.assertFalse(blocked.isDone());
Assert.assertFalse(localSinkHandle.isFinished());
Assert.assertEquals(6 * mockTsBlockSize,
localSinkHandle.getBufferRetainedSizeInBytes());
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index 1c3d6d5747..e1f49f5c77 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -71,7 +71,7 @@ public class LocalSourceHandleTest {
// Consume tsblocks.
Assert.assertTrue(localSourceHandle.isBlocked().isDone());
localSourceHandle.receive();
- ListenableFuture<Void> blocked = localSourceHandle.isBlocked();
+ ListenableFuture<?> blocked = localSourceHandle.isBlocked();
Assert.assertTrue(blocked.isDone());
Assert.assertFalse(localSourceHandle.isAborted());
Assert.assertTrue(localSourceHandle.isFinished());
@@ -103,7 +103,7 @@ public class LocalSourceHandleTest {
localPlanNodeId,
queue,
mockSourceHandleListener);
- ListenableFuture<Void> future = localSourceHandle.isBlocked();
+ ListenableFuture<?> future = localSourceHandle.isBlocked();
Assert.assertFalse(future.isDone());
Assert.assertFalse(localSourceHandle.isAborted());
Assert.assertFalse(localSourceHandle.isFinished());
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index b6e332168b..5e29f47066 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -54,7 +54,7 @@ public class StubSinkHandle implements ISinkHandle {
}
@Override
- public ListenableFuture<Void> isFull() {
+ public ListenableFuture<?> isFull() {
return NOT_BLOCKED;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
index fd91eb8e6b..7696e8d6f7 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
@@ -33,7 +33,7 @@ public class MemorySourceHandleTest {
TsBlock rawResult = new TsBlock(0);
MemorySourceHandle sourceHandle = new MemorySourceHandle(rawResult);
Assert.assertFalse(sourceHandle.isFinished());
- ListenableFuture<Void> blocked = sourceHandle.isBlocked();
+ ListenableFuture<?> blocked = sourceHandle.isBlocked();
Assert.assertTrue(blocked.isDone());
TsBlock result = sourceHandle.receive();
Assert.assertEquals(rawResult, result);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
index 6f49600921..d8c4a4ddf7 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java
@@ -157,7 +157,8 @@ public class DriverTaskTimeoutSentinelThreadTest {
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId,
"inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
-
Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(Futures.immediateVoidFuture());
+ Mockito.when(mockDriver.processFor(Mockito.any()))
+ .thenAnswer(ans -> Futures.immediateVoidFuture());
Mockito.when(mockDriver.isFinished()).thenReturn(true);
AbstractDriverThread executor =
new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue,
mockScheduler);
@@ -189,7 +190,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new
DriverTask());
// Mock the instance with a blocked future
- ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
+ ListenableFuture<?> mockFuture = Mockito.mock(ListenableFuture.class);
Mockito.when(mockFuture.isDone()).thenReturn(false);
Mockito.doAnswer(
ans -> {
@@ -205,7 +206,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId,
"inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+ Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans ->
mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
AbstractDriverThread executor =
new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue,
mockScheduler);
@@ -237,7 +238,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new
DriverTask());
// Mock the instance with a ready future
- ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class);
+ ListenableFuture<?> mockFuture = Mockito.mock(ListenableFuture.class);
Mockito.when(mockFuture.isDone()).thenReturn(true);
Mockito.doAnswer(
ans -> {
@@ -253,7 +254,7 @@ public class DriverTaskTimeoutSentinelThreadTest {
PlanFragmentId fragmentId = new PlanFragmentId(queryId, 0);
FragmentInstanceId instanceId = new FragmentInstanceId(fragmentId,
"inst-0");
Mockito.when(mockDriver.getInfo()).thenReturn(instanceId);
- Mockito.when(mockDriver.processFor(Mockito.any())).thenReturn(mockFuture);
+ Mockito.when(mockDriver.processFor(Mockito.any())).thenAnswer(ans ->
mockFuture);
Mockito.when(mockDriver.isFinished()).thenReturn(false);
AbstractDriverThread executor =
new DriverTaskThread("0", new ThreadGroup("timeout-test"), taskQueue,
mockScheduler);