This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch iotdb-3629 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 89bc007d368981ba22f62386e0493a13ddce2cf5 Author: JackieTien97 <[email protected]> AuthorDate: Mon Jun 27 20:09:23 2022 +0800 [iotdb-3629] Fix TimeJoinOperator may cause Source handle is blocked exception --- .../iotdb/db/mpp/execution/driver/DataDriver.java | 2 +- .../iotdb/db/mpp/execution/driver/Driver.java | 22 ++++++++++------------ .../iotdb/db/mpp/execution/driver/IDriver.java | 2 +- .../db/mpp/execution/driver/SchemaDriver.java | 2 +- .../db/mpp/execution/exchange/ISinkHandle.java | 2 +- .../db/mpp/execution/exchange/ISourceHandle.java | 2 +- .../db/mpp/execution/exchange/LocalSinkHandle.java | 2 +- .../mpp/execution/exchange/LocalSourceHandle.java | 2 +- .../db/mpp/execution/exchange/SinkHandle.java | 2 +- .../db/mpp/execution/exchange/SourceHandle.java | 2 +- .../iotdb/db/mpp/execution/operator/Operator.java | 4 ++-- .../operator/process/AggregationOperator.java | 2 +- .../operator/process/DeviceMergeOperator.java | 12 ++++++++---- .../operator/process/DeviceViewOperator.java | 4 ++-- .../execution/operator/process/FillOperator.java | 2 +- .../operator/process/LastQueryMergeOperator.java | 2 +- .../execution/operator/process/LimitOperator.java | 2 +- .../operator/process/LinearFillOperator.java | 2 +- .../execution/operator/process/OffsetOperator.java | 2 +- .../process/RawDataAggregationOperator.java | 2 +- .../process/SlidingWindowAggregationOperator.java | 2 +- .../execution/operator/process/SortOperator.java | 2 +- .../operator/process/TimeJoinOperator.java | 11 +++++++---- .../operator/process/TransformOperator.java | 2 +- .../operator/process/UpdateLastCacheOperator.java | 2 +- .../operator/schema/CountMergeOperator.java | 11 +++++++---- .../schema/NodeManageMemoryMergeOperator.java | 2 +- .../operator/schema/NodePathsConvertOperator.java | 2 +- .../operator/schema/NodePathsCountOperator.java | 4 ++-- .../operator/schema/SchemaFetchMergeOperator.java | 2 +- .../operator/schema/SchemaQueryMergeOperator.java | 2 +- .../schema/SchemaQueryOrderByHeatOperator.java | 2 +- .../operator/source/ExchangeOperator.java | 4 ++-- .../mpp/execution/schedule/DriverTaskThread.java | 2 +- .../db/mpp/execution/schedule/task/DriverTask.java | 2 +- .../db/mpp/plan/execution/QueryExecution.java | 2 +- .../plan/execution/memory/MemorySourceHandle.java | 2 +- .../iotdb/db/mpp/execution/DataDriverTest.java | 2 +- .../execution/exchange/LocalSinkHandleTest.java | 2 +- .../execution/exchange/LocalSourceHandleTest.java | 4 ++-- .../db/mpp/execution/exchange/StubSinkHandle.java | 2 +- .../execution/memory/MemorySourceHandleTest.java | 2 +- .../DriverTaskTimeoutSentinelThreadTest.java | 4 ++-- 43 files changed, 77 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java index 18f55110d8..ba6909e4fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java @@ -60,7 +60,7 @@ public class DataDriver extends Driver { } @Override - protected boolean init(SettableFuture<Void> blockedFuture) { + protected boolean init(SettableFuture<?> blockedFuture) { if (!init) { try { initialize(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java index e01e2aefd4..acf8e89470 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java @@ -54,8 +54,7 @@ public abstract class Driver implements IDriver { protected final Operator root; protected final ISinkHandle sinkHandle; protected final DriverContext driverContext; - protected final AtomicReference<SettableFuture<Void>> driverBlockedFuture = - new AtomicReference<>(); + protected final AtomicReference<SettableFuture<?>> driverBlockedFuture = new AtomicReference<>(); protected final AtomicReference<State> state = new AtomicReference<>(State.ALIVE); protected final DriverLock exclusiveLock = new DriverLock(); @@ -93,15 +92,15 @@ public abstract class Driver implements IDriver { * * @return true if init succeed, false otherwise */ - protected abstract boolean init(SettableFuture<Void> blockedFuture); + protected abstract boolean init(SettableFuture<?> blockedFuture); /** release resource this driver used */ protected abstract void releaseResource(); @Override - public ListenableFuture<Void> processFor(Duration duration) { + public ListenableFuture<?> processFor(Duration duration) { - SettableFuture<Void> blockedFuture = driverBlockedFuture.get(); + SettableFuture<?> blockedFuture = driverBlockedFuture.get(); // initialization may be time-consuming, so we keep it in the processFor method // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a // critical bug @@ -116,7 +115,7 @@ public abstract class Driver implements IDriver { long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS); - Optional<ListenableFuture<Void>> result = + Optional<ListenableFuture<?>> result = tryWithLock( 100, TimeUnit.MILLISECONDS, @@ -124,7 +123,7 @@ public abstract class Driver implements IDriver { () -> { long start = System.nanoTime(); do { - ListenableFuture<Void> future = processInternal(); + ListenableFuture<?> future = processInternal(); if (!future.isDone()) { return updateDriverBlockedFuture(future); } @@ -174,9 +173,9 @@ public abstract class Driver implements IDriver { return finished; } - private ListenableFuture<Void> processInternal() { + private ListenableFuture<?> processInternal() { try { - ListenableFuture<Void> blocked = root.isBlocked(); + ListenableFuture<?> blocked = root.isBlocked(); if (!blocked.isDone()) { return blocked; } @@ -211,11 +210,10 @@ public abstract class Driver implements IDriver { } } - private ListenableFuture<Void> updateDriverBlockedFuture( - ListenableFuture<Void> sourceBlockedFuture) { + private ListenableFuture<?> updateDriverBlockedFuture(ListenableFuture<?> sourceBlockedFuture) { // driverBlockedFuture will be completed as soon as the sourceBlockedFuture is completed // or any of the operators gets a memory revocation request - SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create(); + SettableFuture<?> newDriverBlockedFuture = SettableFuture.create(); driverBlockedFuture.set(newDriverBlockedFuture); sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), directExecutor()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java index 3111a22c49..0d097bfc7a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java @@ -47,7 +47,7 @@ public interface IDriver { * processing. Otherwise, meaning that this IDriver is blocked and not ready for next * processing. */ - ListenableFuture<Void> processFor(Duration duration); + ListenableFuture<?> processFor(Duration duration); /** * the id information about this IDriver. diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java index 5c52aa1d20..01ed2315b8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java @@ -34,7 +34,7 @@ public class SchemaDriver extends Driver { } @Override - protected boolean init(SettableFuture<Void> blockedFuture) { + protected boolean init(SettableFuture<?> blockedFuture) { return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java index 27ac4cbafb..b4be67e164 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java @@ -34,7 +34,7 @@ public interface ISinkHandle { long getBufferRetainedSizeInBytes(); /** Get a future that will be completed when the output buffer is not full. */ - ListenableFuture<Void> isFull(); + ListenableFuture<?> isFull(); /** * Send a list of tsblocks to an unpartitioned output buffer. If no-more-tsblocks has been set, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java index 376de3cf5d..be62bff4ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java @@ -44,7 +44,7 @@ public interface ISourceHandle { boolean isFinished(); /** Get a future that will be completed when the input buffer is not empty. */ - ListenableFuture<Void> isBlocked(); + ListenableFuture<?> isBlocked(); /** If this handle is aborted. */ boolean isAborted(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java index c7b4b982ad..852e79ac8b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java @@ -71,7 +71,7 @@ public class LocalSinkHandle implements ISinkHandle { } @Override - public synchronized ListenableFuture<Void> isFull() { + public synchronized ListenableFuture<?> isFull() { if (aborted) { throw new IllegalStateException("Sink handle is closed."); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index 8f47f9d852..c101e7b644 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -122,7 +122,7 @@ public class LocalSourceHandle implements ISourceHandle { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { if (aborted) { throw new IllegalStateException("Source handle is closed."); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 71fe329ed4..3619de44f4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -109,7 +109,7 @@ public class SinkHandle implements ISinkHandle { } @Override - public synchronized ListenableFuture<Void> isFull() { + public synchronized ListenableFuture<?> isFull() { if (aborted) { throw new IllegalStateException("Sink handle is aborted."); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index a19d080d07..facd0c1e0f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -201,7 +201,7 @@ public class SourceHandle implements ISourceHandle { } @Override - public synchronized ListenableFuture<Void> isBlocked() { + public synchronized ListenableFuture<?> isBlocked() { if (aborted) { throw new IllegalStateException("Source handle is aborted."); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java index 03c12262dd..dfa08e033f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/Operator.java @@ -26,7 +26,7 @@ import static com.google.common.util.concurrent.Futures.immediateVoidFuture; public interface Operator extends AutoCloseable { - ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture(); + ListenableFuture<?> NOT_BLOCKED = immediateVoidFuture(); OperatorContext getOperatorContext(); @@ -34,7 +34,7 @@ public interface Operator extends AutoCloseable { * Returns a future that will be completed when the operator becomes unblocked. If the operator is * not blocked, this method should return {@code NOT_BLOCKED}. */ - default ListenableFuture<Void> isBlocked() { + default ListenableFuture<?> isBlocked() { return NOT_BLOCKED; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java index 4dd1d9c411..36d3495f30 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java @@ -85,7 +85,7 @@ public class AggregationOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { for (int i = 0; i < inputOperatorsCount; i++) { ListenableFuture<Void> blocked = children.get(i).isBlocked(); if (!blocked.isDone()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java index 3e4eacb4a7..d16d47aa28 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java @@ -32,9 +32,12 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import static com.google.common.util.concurrent.Futures.successfulAsList; + /** * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators. * @@ -94,16 +97,17 @@ public class DeviceMergeOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { + List<ListenableFuture<?>> listenableFutures = new ArrayList<>(); for (int i = 0; i < inputOperatorsCount; i++) { if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) { - ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked(); + ListenableFuture<?> blocked = deviceOperators.get(i).isBlocked(); if (!blocked.isDone()) { - return blocked; + listenableFutures.add(blocked); } } } - return NOT_BLOCKED; + return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java index a95bee8cf1..127a26a8b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java @@ -94,8 +94,8 @@ public class DeviceViewOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { - ListenableFuture<Void> blocked = getCurDeviceOperator().isBlocked(); + public ListenableFuture<?> isBlocked() { + ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked(); if (!blocked.isDone()) { return blocked; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java index f853d5d9cd..da7ffe6ab1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java @@ -52,7 +52,7 @@ public class FillOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java index 70ccf2134f..91aeecec02 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java @@ -50,7 +50,7 @@ public class LastQueryMergeOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { if (currentIndex < inputOperatorsCount) { return children.get(currentIndex).isBlocked(); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java index 4ae909a2a8..ef3870fb17 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java @@ -46,7 +46,7 @@ public class LimitOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java index dbbe1b638a..8193e20684 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java @@ -72,7 +72,7 @@ public class LinearFillOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java index b2136a5b4a..2820992745 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java @@ -46,7 +46,7 @@ public class OffsetOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java index 2d355d8775..01b0625c60 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java @@ -89,7 +89,7 @@ public class RawDataAggregationOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java index 93bfa854bb..2685e3e465 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java @@ -152,7 +152,7 @@ public class SlidingWindowAggregationOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java index e3c297fe47..38bda24bf7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java @@ -31,7 +31,7 @@ public class SortOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return ProcessOperator.super.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java index 219342fe5b..5a576ea5cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java @@ -31,9 +31,11 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.Futures.successfulAsList; public class TimeJoinOperator implements ProcessOperator { @@ -109,16 +111,17 @@ public class TimeJoinOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { + List<ListenableFuture<?>> listenableFutures = new ArrayList<>(); for (int i = 0; i < inputOperatorsCount; i++) { if (!noMoreTsBlocks[i] && empty(i)) { - ListenableFuture<Void> blocked = children.get(i).isBlocked(); + ListenableFuture<?> blocked = children.get(i).isBlocked(); if (!blocked.isDone()) { - return blocked; + listenableFutures.add(blocked); } } } - return NOT_BLOCKED; + return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java index fc24eac43d..6ed6ec9ca1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java @@ -334,7 +334,7 @@ public class TransformOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return inputOperator.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java index f2ec8c0cc8..1bfebb251c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java @@ -80,7 +80,7 @@ public class UpdateLastCacheOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java index 5af5087bbe..a9176d658c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java @@ -36,6 +36,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static com.google.common.util.concurrent.Futures.successfulAsList; + public class CountMergeOperator implements ProcessOperator { private final PlanNodeId planNodeId; private final OperatorContext operatorContext; @@ -56,14 +58,15 @@ public class CountMergeOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { + List<ListenableFuture<?>> listenableFutures = new ArrayList<>(); for (Operator child : children) { - ListenableFuture<Void> blocked = child.isBlocked(); + ListenableFuture<?> blocked = child.isBlocked(); if (!blocked.isDone()) { - return blocked; + listenableFutures.add(blocked); } } - return NOT_BLOCKED; + return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java index e6e15b2e3f..22c1d61d92 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java @@ -52,7 +52,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java index bb1b50228d..e6bfa76e23 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java @@ -53,7 +53,7 @@ public class NodePathsConvertOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return child.isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java index 3c0614589d..ca8f8ef5da 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java @@ -70,8 +70,8 @@ public class NodePathsCountOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { - ListenableFuture<Void> blocked = child.isBlocked(); + public ListenableFuture<?> isBlocked() { + ListenableFuture<?> blocked = child.isBlocked(); while (child.hasNext() && blocked.isDone()) { TsBlock tsBlock = child.next(); if (null != tsBlock && !tsBlock.isEmpty()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java index 4c862fee6e..57ea220b63 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java @@ -64,7 +64,7 @@ public class SchemaFetchMergeOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return children.get(currentIndex).isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java index d594b38bd8..87df583494 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java @@ -65,7 +65,7 @@ public class SchemaQueryMergeOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return children.get(currentIndex).isBlocked(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java index 22b7156f28..b29c3805ae 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java @@ -116,7 +116,7 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { for (int i = 0; i < operators.size(); i++) { if (!noMoreTsBlocks[i]) { Operator operator = operators.get(i); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java index 6534cffb76..3579a642bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java @@ -33,7 +33,7 @@ public class ExchangeOperator implements SourceOperator { private final PlanNodeId sourceId; - private ListenableFuture<Void> isBlocked = NOT_BLOCKED; + private ListenableFuture<?> isBlocked = NOT_BLOCKED; public ExchangeOperator( OperatorContext operatorContext, ISourceHandle sourceHandle, PlanNodeId sourceId) { @@ -68,7 +68,7 @@ public class ExchangeOperator implements SourceOperator { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { // Avoid registering a new callback in the source handle when one is already pending if (isBlocked.isDone()) { isBlocked = sourceHandle.isBlocked(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java index b013377a13..02a795cff3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java @@ -55,7 +55,7 @@ public class DriverTaskThread extends AbstractDriverThread { } IDriver instance = task.getFragmentInstance(); CpuTimer timer = new CpuTimer(); - ListenableFuture<Void> future = instance.processFor(EXECUTION_TIME_SLICE); + ListenableFuture<?> future = instance.processFor(EXECUTION_TIME_SLICE); CpuTimer.CpuDuration duration = timer.elapsedTime(); // long cost = System.nanoTime() - startTime; // If the future is cancelled, the task is in an error and should be thrown. diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java index 418e448759..f65c1f3988 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/task/DriverTask.java @@ -194,7 +194,7 @@ public class DriverTask implements IDIndexedAccessible { } @Override - public ListenableFuture<Void> processFor(Duration duration) { + public ListenableFuture<?> processFor(Duration duration) { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index f6a775c150..7a9ab53fa4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -289,7 +289,7 @@ public class QueryExecution implements IQueryExecution { stateMachine.transitionToFinished(); return Optional.empty(); } - ListenableFuture<Void> blocked = resultHandle.isBlocked(); + ListenableFuture<?> blocked = resultHandle.isBlocked(); blocked.get(); if (!resultHandle.isFinished()) { return Optional.of(resultHandle.receive()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java index a6d7030a30..588407eae1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java @@ -66,7 +66,7 @@ public class MemorySourceHandle implements ISourceHandle { } @Override - public ListenableFuture<Void> isBlocked() { + public ListenableFuture<?> isBlocked() { return immediateFuture(null); } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java index feec870147..0d19e99889 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java @@ -186,7 +186,7 @@ public class DataDriverTest { while (!dataDriver.isFinished()) { assertEquals(FragmentInstanceState.RUNNING, stateMachine.getState()); - ListenableFuture<Void> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE); + ListenableFuture<?> blocked = dataDriver.processFor(EXECUTION_TIME_SLICE); assertTrue(blocked.isDone()); } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java index 4c9d2da468..422139a959 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java @@ -137,7 +137,7 @@ public class LocalSinkHandleTest { numOfSentTsblocks += 1; } Assert.assertEquals(6, numOfSentTsblocks); - ListenableFuture<Void> blocked = localSinkHandle.isFull(); + ListenableFuture<?> blocked = localSinkHandle.isFull(); Assert.assertFalse(blocked.isDone()); Assert.assertFalse(localSinkHandle.isFinished()); Assert.assertEquals(6 * mockTsBlockSize, localSinkHandle.getBufferRetainedSizeInBytes()); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java index 1c3d6d5747..e1f49f5c77 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java @@ -71,7 +71,7 @@ public class LocalSourceHandleTest { // Consume tsblocks. Assert.assertTrue(localSourceHandle.isBlocked().isDone()); localSourceHandle.receive(); - ListenableFuture<Void> blocked = localSourceHandle.isBlocked(); + ListenableFuture<?> blocked = localSourceHandle.isBlocked(); Assert.assertTrue(blocked.isDone()); Assert.assertFalse(localSourceHandle.isAborted()); Assert.assertTrue(localSourceHandle.isFinished()); @@ -103,7 +103,7 @@ public class LocalSourceHandleTest { localPlanNodeId, queue, mockSourceHandleListener); - ListenableFuture<Void> future = localSourceHandle.isBlocked(); + ListenableFuture<?> future = localSourceHandle.isBlocked(); Assert.assertFalse(future.isDone()); Assert.assertFalse(localSourceHandle.isAborted()); Assert.assertFalse(localSourceHandle.isFinished()); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java index b6e332168b..5e29f47066 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java @@ -54,7 +54,7 @@ public class StubSinkHandle implements ISinkHandle { } @Override - public ListenableFuture<Void> isFull() { + public ListenableFuture<?> isFull() { return NOT_BLOCKED; } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java index fd91eb8e6b..7696e8d6f7 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java @@ -33,7 +33,7 @@ public class MemorySourceHandleTest { TsBlock rawResult = new TsBlock(0); MemorySourceHandle sourceHandle = new MemorySourceHandle(rawResult); Assert.assertFalse(sourceHandle.isFinished()); - ListenableFuture<Void> blocked = sourceHandle.isBlocked(); + ListenableFuture<?> blocked = sourceHandle.isBlocked(); Assert.assertTrue(blocked.isDone()); TsBlock result = sourceHandle.receive(); Assert.assertEquals(rawResult, result); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java index 6f49600921..6bb9929d65 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java @@ -189,7 +189,7 @@ public class DriverTaskTimeoutSentinelThreadTest { new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask()); // Mock the instance with a blocked future - ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class); + ListenableFuture<?> mockFuture = Mockito.mock(ListenableFuture.class); Mockito.when(mockFuture.isDone()).thenReturn(false); Mockito.doAnswer( ans -> { @@ -237,7 +237,7 @@ public class DriverTaskTimeoutSentinelThreadTest { new L1PriorityQueue<>(100, new DriverTask.TimeoutComparator(), new DriverTask()); // Mock the instance with a ready future - ListenableFuture<Void> mockFuture = Mockito.mock(ListenableFuture.class); + ListenableFuture<?> mockFuture = Mockito.mock(ListenableFuture.class); Mockito.when(mockFuture.isDone()).thenReturn(true); Mockito.doAnswer( ans -> {
