This is an automated email from the ASF dual-hosted git repository.
Wei-hao-Li pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 022e11332e6 Enable show queries to be executed immediately when the
available memory in the memoryPool is insufficient (#17507)
022e11332e6 is described below
commit 022e11332e6e5e8e6500e497357b2fd4f5949c7e
Author: Weihao Li <[email protected]>
AuthorDate: Tue Apr 21 15:45:20 2026 +0800
Enable show queries to be executed immediately when the available memory in
the memoryPool is insufficient (#17507)
---
.../db/queryengine/common/MPPQueryContext.java | 14 +++
.../execution/exchange/MPPDataExchangeManager.java | 56 +++++++++-
.../execution/exchange/SharedTsBlockQueue.java | 57 ++++++----
.../execution/exchange/sink/SinkChannel.java | 62 ++++++++---
.../execution/exchange/source/SourceHandle.java | 90 ++++++++++++----
.../fragment/FragmentInstanceContext.java | 13 +++
.../fragment/FragmentInstanceManager.java | 2 +
.../queryengine/execution/memory/MemoryPool.java | 56 +++++++---
.../db/queryengine/plan/analyze/Analysis.java | 2 +-
.../queryengine/plan/execution/QueryExecution.java | 7 +-
.../plan/planner/OperatorTreeGenerator.java | 6 +-
.../plan/planner/TableOperatorGenerator.java | 6 +-
.../plan/planner/plan/FragmentInstance.java | 2 +
.../execution/exchange/LocalSinkChannelTest.java | 10 +-
.../execution/exchange/SharedTsBlockQueueTest.java | 11 +-
.../execution/exchange/ShuffleSinkHandleTest.java | 5 +-
.../execution/exchange/SourceHandleTest.java | 5 +-
.../db/queryengine/execution/exchange/Utils.java | 26 +++--
.../execution/memory/MemoryPoolTest.java | 115 ++++++++++++++++++---
19 files changed, 430 insertions(+), 115 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index a507b98008b..db2e6b7d44b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -129,6 +129,12 @@ public class MPPQueryContext implements IAuditEntity {
private boolean userQuery = false;
+ /**
+ * When true (e.g. SHOW QUERIES), operator and exchange memory may use
fallback when pool is
+ * insufficient. Set from analysis via {@link
#setNeedSetHighestPriority(boolean)}.
+ */
+ private boolean needSetHighestPriority = false;
+
private boolean debug = false;
private Map<NodeRef<Table>, Query> cteQueries = new HashMap<>();
@@ -507,6 +513,14 @@ public class MPPQueryContext implements IAuditEntity {
this.userQuery = userQuery;
}
+ public boolean needSetHighestPriority() {
+ return needSetHighestPriority;
+ }
+
+ public void setNeedSetHighestPriority(boolean needSetHighestPriority) {
+ this.needSetHighestPriority = needSetHighestPriority;
+ }
+
public boolean isDebug() {
return debug;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index d2caf330d66..11393dc5d7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
@@ -656,7 +657,11 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
}
queue =
new SharedTsBlockQueue(
- localFragmentInstanceId, localPlanNodeId, localMemoryManager,
executorService);
+ localFragmentInstanceId,
+ localPlanNodeId,
+ localMemoryManager,
+ executorService,
+ instanceContext.isHighestPriority());
}
return new LocalSinkChannel(
@@ -680,7 +685,8 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
localMemoryManager,
- executorService);
+ executorService,
+ driverContext.getFragmentInstanceContext().isHighestPriority());
queue.allowAddingTsBlock();
return new LocalSinkChannel(
queue,
@@ -718,6 +724,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
tsBlockSerdeFactory.get(),
new ISinkChannelListenerImpl(
localFragmentInstanceId, instanceContext, instanceContext::failed,
cnt),
+ instanceContext.isHighestPriority(),
mppDataExchangeServiceClientManager);
}
@@ -802,6 +809,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
context.getDriverTaskID().toString());
}
+ @TestOnly
public synchronized ISourceHandle createLocalSourceHandleForFragment(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
@@ -809,6 +817,24 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+ return createLocalSourceHandleForFragment(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ remotePlanNodeId,
+ remoteFragmentInstanceId,
+ index,
+ onFailureCallback,
+ false);
+ }
+
+ public synchronized ISourceHandle createLocalSourceHandleForFragment(
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ String remotePlanNodeId,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ int index,
+ IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+ boolean isHighestPriority) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&&
sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
throw new IllegalStateException(
@@ -840,7 +866,11 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
}
queue =
new SharedTsBlockQueue(
- remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager,
executorService);
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localMemoryManager,
+ executorService,
+ isHighestPriority);
}
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
@@ -854,6 +884,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
return localSourceHandle;
}
+ @TestOnly
@Override
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
@@ -862,6 +893,24 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+ return createSourceHandle(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ indexOfUpstreamSinkHandle,
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ onFailureCallback,
+ false);
+ }
+
+ public ISourceHandle createSourceHandle(
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ int indexOfUpstreamSinkHandle,
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+ boolean isHighestPriority) {
Map<String, ISourceHandle> sourceHandleMap =
sourceHandles.get(localFragmentInstanceId);
if (sourceHandleMap != null &&
sourceHandleMap.containsKey(localPlanNodeId)) {
throw new IllegalStateException(
@@ -891,6 +940,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
executorService,
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl(onFailureCallback),
+ isHighestPriority,
mppDataExchangeServiceClientManager);
sourceHandles
.computeIfAbsent(localFragmentInstanceId, key -> new
ConcurrentHashMap<>())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index e49efabb986..e0cea9dcaff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.db.queryengine.execution.exchange;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import
org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -63,7 +65,7 @@ public class SharedTsBlockQueue {
private long bufferRetainedSizeInBytes = 0L;
- private final Queue<TsBlock> queue = new LinkedList<>();
+ private final Queue<Pair<TsBlock, Long>> queue = new LinkedList<>();
private SettableFuture<Void> blocked = SettableFuture.create();
@@ -83,17 +85,28 @@ public class SharedTsBlockQueue {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();
+ private final boolean isHighestPriority;
private volatile Throwable abortedCause = null;
// used for SharedTsBlockQueue listener
private final ExecutorService executorService;
+ @TestOnly
public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService) {
+ this(fragmentInstanceId, planNodeId, localMemoryManager, executorService,
false);
+ }
+
+ public SharedTsBlockQueue(
+ TFragmentInstanceId fragmentInstanceId,
+ String planNodeId,
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService,
+ boolean isHighestPriority) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be
null");
this.fullFragmentInstanceId =
@@ -102,6 +115,7 @@ public class SharedTsBlockQueue {
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be
null");
this.executorService = Validate.notNull(executorService, "ExecutorService
can not be null.");
+ this.isHighestPriority = isHighestPriority;
}
public boolean hasNoMoreTsBlocks() {
@@ -196,15 +210,18 @@ public class SharedTsBlockQueue {
}
throw new IllegalStateException("queue has been destroyed");
}
- TsBlock tsBlock = queue.remove();
- localMemoryManager
- .getQueryPool()
- .free(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- tsBlock.getSizeInBytes());
- bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
+ Pair<TsBlock, Long> tsBlockWithReservedBytes = queue.remove();
+ long reservedBytes = tsBlockWithReservedBytes.right;
+ if (reservedBytes > 0) {
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ reservedBytes);
+ bufferRetainedSizeInBytes -= reservedBytes;
+ }
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the
event
// to
// corresponding LocalSinkChannel.
@@ -214,7 +231,7 @@ public class SharedTsBlockQueue {
if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
}
- return tsBlock;
+ return tsBlockWithReservedBytes.left;
}
/**
@@ -240,20 +257,22 @@ public class SharedTsBlockQueue {
localFragmentInstanceId.queryId, fullFragmentInstanceId,
localPlanNodeId);
alreadyRegistered = true;
}
- Pair<ListenableFuture<Void>, Boolean> pair =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getSizeInBytes(),
- maxBytesCanReserve);
- blockedOnMemory = pair.left;
- bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
+ maxBytesCanReserve,
+ isHighestPriority);
+ blockedOnMemory = reserveResult.getFuture();
+ long reservedBytes = reserveResult.getReservedBytes();
+ bufferRetainedSizeInBytes += reservedBytes;
// reserve memory failed, we should wait until there is enough memory
- if (!Boolean.TRUE.equals(pair.right)) {
+ if (!reserveResult.isReserveSuccess()) {
SettableFuture<Void> channelBlocked = SettableFuture.create();
blockedOnMemory.addListener(
() -> {
@@ -268,7 +287,7 @@ public class SharedTsBlockQueue {
channelBlocked.set(null);
return;
}
- queue.add(tsBlock);
+ queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
@@ -285,7 +304,7 @@ public class SharedTsBlockQueue {
executorService);
return channelBlocked;
} else { // reserve memory succeeded, add the TsBlock directly
- queue.add(tsBlock);
+ queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 9cb624e4d96..3a19183f8e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import
org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();
+ private final boolean isHighestPriority;
+
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET
=
DataExchangeCostMetricSet.getInstance();
private static final DataExchangeCountMetricSet
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -128,6 +131,34 @@ public class SinkChannel implements ISinkChannel {
RamUsageEstimator.shallowSizeOfInstance(SinkChannel.class)
+ RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class)
* 2;
+ @TestOnly
+ @SuppressWarnings("squid:S107")
+ public SinkChannel(
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ String remotePlanNodeId,
+ String localPlanNodeId,
+ TFragmentInstanceId localFragmentInstanceId,
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService,
+ TsBlockSerde serde,
+ SinkListener sinkListener,
+ IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+ mppDataExchangeServiceClientManager) {
+ this(
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localPlanNodeId,
+ localFragmentInstanceId,
+ localMemoryManager,
+ executorService,
+ serde,
+ sinkListener,
+ false,
+ mppDataExchangeServiceClientManager);
+ }
+
@SuppressWarnings("squid:S107")
public SinkChannel(
TEndPoint remoteEndpoint,
@@ -139,6 +170,7 @@ public class SinkChannel implements ISinkChannel {
ExecutorService executorService,
TsBlockSerde serde,
SinkListener sinkListener,
+ boolean isHighestPriority,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can
not be null.");
@@ -155,6 +187,7 @@ public class SinkChannel implements ISinkChannel {
this.executorService = Validate.notNull(executorService, "executorService
can not be null.");
this.serde = Validate.notNull(serde, "serde can not be null.");
this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not
be null.");
+ this.isHighestPriority = isHighestPriority;
this.mppDataExchangeServiceClientManager =
mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
this.threadName =
@@ -204,21 +237,22 @@ public class SinkChannel implements ISinkChannel {
long sizeInBytes = tsBlock.getSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
- blocked =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
sizeInBytes,
- maxBytesCanReserve)
- .left;
- bufferRetainedSizeInBytes += sizeInBytes;
+ maxBytesCanReserve,
+ isHighestPriority);
+ blocked = reserveResult.getFuture();
+ bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock,
currentTsBlockSize));
nextSequenceId += 1;
- currentTsBlockSize = sizeInBytes;
+ currentTsBlockSize = reserveResult.getReservedBytes();
submitSendNewDataBlockEventTask(startSequenceId,
ImmutableList.of(sizeInBytes));
} finally {
@@ -434,19 +468,21 @@ public class SinkChannel implements ISinkChannel {
return;
}
// SinkChannel is opened when ShuffleSinkHandle choose it as the next
channel
- this.blocked =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
- maxBytesCanReserve) // actually we only know
maxBytesCanReserve after
- // the handle is created, so we use DEFAULT here. It is ok to use
DEFAULT here because
- // at first this SinkChannel has not reserved memory.
- .left;
- this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ maxBytesCanReserve,
+ isHighestPriority); // actually we only know
maxBytesCanReserve after
+ // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT
here because
+ // at first this SinkChannel has not reserved memory.
+ this.blocked = reserveResult.getFuture();
+ this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes();
+ this.currentTsBlockSize = reserveResult.getReservedBytes();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
index 1c6406a2ed9..1add61170a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -116,6 +117,8 @@ public class SourceHandle implements ISourceHandle {
*/
private boolean canGetTsBlockFromRemote = false;
+ private final boolean isHighestPriority;
+
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET
=
DataExchangeCostMetricSet.getInstance();
private static final DataExchangeCountMetricSet
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -125,6 +128,7 @@ public class SourceHandle implements ISourceHandle {
RamUsageEstimator.shallowSizeOfInstance(SourceHandle.class)
+ RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class)
* 2;
+ @TestOnly
@SuppressWarnings("squid:S107")
public SourceHandle(
TEndPoint remoteEndpoint,
@@ -138,6 +142,34 @@ public class SourceHandle implements ISourceHandle {
SourceHandleListener sourceHandleListener,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
+ this(
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ localFragmentInstanceId,
+ localPlanNodeId,
+ indexOfUpstreamSinkHandle,
+ localMemoryManager,
+ executorService,
+ serde,
+ sourceHandleListener,
+ false,
+ mppDataExchangeServiceClientManager);
+ }
+
+ @SuppressWarnings("squid:S107")
+ public SourceHandle(
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ int indexOfUpstreamSinkHandle,
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService,
+ TsBlockSerde serde,
+ SourceHandleListener sourceHandleListener,
+ boolean isHighestPriority,
+ IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+ mppDataExchangeServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can
not be null.");
this.remoteFragmentInstanceId =
Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId
can not be null.");
@@ -153,6 +185,7 @@ public class SourceHandle implements ISourceHandle {
this.serde = Validate.notNull(serde, "serde can not be null.");
this.sourceHandleListener =
Validate.notNull(sourceHandleListener, "sourceHandleListener can not
be null.");
+ this.isHighestPriority = isHighestPriority;
this.bufferRetainedSizeInBytes = 0L;
this.mppDataExchangeServiceClientManager =
mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
@@ -193,19 +226,24 @@ public class SourceHandle implements ISourceHandle {
if (tsBlock == null) {
return null;
}
- long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+ Long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+ if (retainedSize == null) {
+ throw new IllegalStateException("Reserved data block size is null.");
+ }
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}",
currSequenceId, retainedSize);
}
currSequenceId += 1;
- bufferRetainedSizeInBytes -= retainedSize;
- localMemoryManager
- .getQueryPool()
- .free(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- retainedSize);
+ if (retainedSize > 0) {
+ bufferRetainedSizeInBytes -= retainedSize;
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ retainedSize);
+ }
if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
if (LOGGER.isDebugEnabled()) {
@@ -242,18 +280,24 @@ public class SourceHandle implements ISourceHandle {
if (bytesToReserve == null) {
throw new IllegalStateException("Data block size is null.");
}
- pair =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bytesToReserve,
- maxBytesCanReserve);
- bufferRetainedSizeInBytes += bytesToReserve;
+ maxBytesCanReserve,
+ isHighestPriority);
+ pair = new Pair<>(reserveResult.getFuture(),
reserveResult.isReserveSuccess());
+ // actually reserve size is not equals raw size, update the actually
reserve size to the map
+ if (reserveResult.getReservedBytes() != bytesToReserve) {
+ sequenceIdToDataBlockSize.put(endSequenceId,
reserveResult.getReservedBytes());
+ }
+ bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
endSequenceId += 1;
- reservedBytes += bytesToReserve;
+ reservedBytes += reserveResult.getReservedBytes();
if (!Boolean.TRUE.equals(pair.right)) {
blockedSize = bytesToReserve;
break;
@@ -631,14 +675,16 @@ public class SourceHandle implements ISourceHandle {
if (aborted || closed) {
return;
}
- bufferRetainedSizeInBytes -= reservedBytes;
- localMemoryManager
- .getQueryPool()
- .free(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- reservedBytes);
+ if (reservedBytes > 0) {
+ bufferRetainedSizeInBytes -= reservedBytes;
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ reservedBytes);
+ }
sourceHandleListener.onFailure(SourceHandle.this, t);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index df9ac2d2f93..4cfd8dc63ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -161,6 +161,7 @@ public class FragmentInstanceContext extends QueryContext {
private long unclosedUnseqFileNum = 0;
private long closedSeqFileNum = 0;
private long closedUnseqFileNum = 0;
+ private boolean highestPriority = false;
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id,
@@ -1190,6 +1191,18 @@ public class FragmentInstanceContext extends
QueryContext {
return ignoreNotExistsDevice;
}
+ /**
+ * Same flag as {@link
+ *
org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis#needSetHighestPriority()}.
+ */
+ public boolean isHighestPriority() {
+ return highestPriority;
+ }
+
+ public void setHighestPriority(boolean highestPriority) {
+ this.highestPriority = highestPriority;
+ }
+
public boolean isSingleSourcePath() {
return singleSourcePath;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 1898cbfe53c..9dfb2ffa684 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -163,6 +163,7 @@ public class FragmentInstanceManager {
dataNodeQueryContextMap,
instance.isDebug(),
instance.isVerbose()));
+ context.setHighestPriority(instance.isHighestPriority());
try {
List<PipelineDriverFactory> driverFactories =
@@ -277,6 +278,7 @@ public class FragmentInstanceManager {
instance.getSessionInfo(),
instance.isDebug(),
instance.isVerbose()));
+ context.setHighestPriority(instance.isHighestPriority());
try {
List<PipelineDriverFactory> driverFactories =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 3e00c845dab..64bc5022588 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -26,10 +26,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.external.commons.lang3.Validate;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +41,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
/** A thread-safe memory pool. */
public class MemoryPool {
@@ -113,6 +113,31 @@ public class MemoryPool {
}
}
+ public static class MemoryReservationResult {
+ private final ListenableFuture<Void> future;
+ private final boolean reserveSuccess;
+ private final long reservedBytes;
+
+ public MemoryReservationResult(
+ ListenableFuture<Void> future, boolean reserveSuccess, long
reservedBytes) {
+ this.future = future;
+ this.reserveSuccess = reserveSuccess;
+ this.reservedBytes = reservedBytes;
+ }
+
+ public ListenableFuture<Void> getFuture() {
+ return future;
+ }
+
+ public boolean isReserveSuccess() {
+ return reserveSuccess;
+ }
+
+ public long getReservedBytes() {
+ return reservedBytes;
+ }
+ }
+
private final String id;
private final IMemoryBlock memoryBlock;
private final long maxBytesPerFragmentInstance;
@@ -224,18 +249,20 @@ public class MemoryPool {
}
/**
- * Reserve memory with bytesToReserve.
+ * Reserve memory with bytesToReserve respect priority.
*
- * @return if reserve succeed, pair.right will be true, otherwise false
+ * @return if reserve succeed, reservedBytes may be zero or equals with
bytesToReserve; if reserve
+ * failed, reservedBytes must be equals with bytesToReserve
* @throws IllegalArgumentException throw exception if current query
requests more memory than can
* be allocated.
*/
- public Pair<ListenableFuture<Void>, Boolean> reserve(
+ public MemoryReservationResult reserveWithPriority(
String queryId,
String fragmentInstanceId,
String planNodeId,
long bytesToReserve,
- long maxBytesCanReserve) {
+ long maxBytesCanReserve,
+ boolean isHighestPriority) {
Validate.notNull(queryId, "queryId can not be null.");
Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be
null.");
Validate.notNull(planNodeId, "planNodeId can not be null.");
@@ -256,19 +283,21 @@ public class MemoryPool {
bytesToReserve, maxBytesCanReserve));
}
- ListenableFuture<Void> result;
if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve,
maxBytesCanReserve)) {
- result = Futures.immediateFuture(null);
- return new Pair<>(result, Boolean.TRUE);
+ return new MemoryReservationResult(immediateVoidFuture(), true,
bytesToReserve);
} else {
+ rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
+ if (isHighestPriority) {
+ // SHOW QUERIES: treat as success with zero bytes reserved from pool
when insufficient.
+ return new MemoryReservationResult(immediateVoidFuture(), true, 0L);
+ }
LOGGER.debug(
"Blocked reserve request: {} bytes memory for planNodeId{}",
bytesToReserve, planNodeId);
- rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
- result =
+ ListenableFuture<Void> result =
MemoryReservationFuture.create(
queryId, fragmentInstanceId, planNodeId, bytesToReserve,
maxBytesCanReserve);
memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
- return new Pair<>(result, Boolean.FALSE);
+ return new MemoryReservationResult(result, false, bytesToReserve);
}
}
@@ -299,7 +328,8 @@ public class MemoryPool {
/**
* Cancel the specified memory reservation. If the reservation has finished,
do nothing.
*
- * @param future The future returned from {@link #reserve(String, String,
String, long, long)}
+ * @param future The future returned from {@link
#reserveWithPriority(String, String, String,
+ * long, long, boolean)}
* @return If the future has not complete, return the number of bytes being
reserved. Otherwise,
* return 0.
*/
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index ec1bdeed0c6..0d17b50a119 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -511,7 +511,7 @@ public class Analysis implements IAnalysis {
public boolean needSetHighestPriority() {
// if is this Statement is ShowQueryStatement, set its instances to the
highest priority, so
// that the sub-tasks of the ShowQueries instances could be executed first.
- return StatementType.SHOW_QUERIES.equals(statement.getType());
+ return statement != null &&
StatementType.SHOW_QUERIES.equals(statement.getType());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 97070e37357..785a273238a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -129,6 +129,7 @@ public class QueryExecution implements IQueryExecution {
this.context = context;
this.planner = planner;
this.analysis = analyze(context);
+ context.setNeedSetHighestPriority(analysis.needSetHighestPriority());
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
// We add the abort logic inside the QueryExecution.
@@ -610,7 +611,8 @@ public class QueryExecution implements IQueryExecution {
context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
0, // Upstream of result ExchangeNode will only have one
child.
- stateMachine::transitionToFailed)
+ stateMachine::transitionToFailed,
+ context.needSetHighestPriority())
: MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.createSourceHandle(
@@ -619,7 +621,8 @@ public class QueryExecution implements IQueryExecution {
0,
upstreamEndPoint,
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
- stateMachine::transitionToFailed);
+ stateMachine::transitionToFailed,
+ context.needSetHighestPriority());
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 422454c11b6..0e32dfd097e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2694,14 +2694,16 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getUpstreamPlanNodeId().getId(),
remoteInstanceId.toThrift(),
node.getIndexOfUpstreamSinkHandle(),
- context.getInstanceContext()::failed)
+ context.getInstanceContext()::failed,
+ context.getInstanceContext().isHighestPriority())
: MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
node.getIndexOfUpstreamSinkHandle(),
upstreamEndPoint,
remoteInstanceId.toThrift(),
- context.getInstanceContext()::failed);
+ context.getInstanceContext()::failed,
+ context.getInstanceContext().isHighestPriority());
if (!isSameNode) {
context.addExchangeSumNum(1);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index c31a2061f49..f1bb43abed4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -454,14 +454,16 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getUpstreamPlanNodeId().getId(),
remoteInstanceId.toThrift(),
node.getIndexOfUpstreamSinkHandle(),
- context.getInstanceContext()::failed)
+ context.getInstanceContext()::failed,
+ context.getInstanceContext().isHighestPriority())
: MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
node.getIndexOfUpstreamSinkHandle(),
upstreamEndPoint,
remoteInstanceId.toThrift(),
- context.getInstanceContext()::failed);
+ context.getInstanceContext()::failed,
+ context.getInstanceContext().isHighestPriority());
if (!isSameNode) {
context.addExchangeSumNum(1);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index d53e29e1692..ae99aac6fba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -267,6 +267,7 @@ public class FragmentInstance implements IConsensusRequest {
fragmentInstance.hostDataNode =
hasHostDataNode ?
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer);
+ fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer));
return fragmentInstance;
}
@@ -293,6 +294,7 @@ public class FragmentInstance implements IConsensusRequest {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode,
outputStream);
}
ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
+ ReadWriteIOUtils.write(isHighestPriority, outputStream);
return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
} catch (IOException e) {
LOGGER.error("Unexpected error occurs when serializing this
FragmentInstance.", e);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
index 3ec95a0205a..944094e0fbf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
@@ -94,13 +94,14 @@ public class LocalSinkChannelTest {
Assert.assertFalse(localSinkChannel.isFinished());
Assert.assertEquals(11 * mockTsBlockSize,
localSinkChannel.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
- Long.MAX_VALUE);
+ Long.MAX_VALUE,
+ false);
// Receive TsBlocks.
int numOfReceivedTsblocks = 0;
@@ -187,13 +188,14 @@ public class LocalSinkChannelTest {
Assert.assertFalse(localSinkChannel.isFinished());
Assert.assertEquals(11 * mockTsBlockSize,
localSinkChannel.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
- Long.MAX_VALUE);
+ Long.MAX_VALUE,
+ false);
// Abort.
localSinkChannel.abort();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
index 00c653499b0..a92eaa5d0e2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
@@ -22,13 +22,13 @@ package org.apache.iotdb.db.queryengine.execution.exchange;
import org.apache.iotdb.commons.memory.MemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.tsfile.external.commons.lang3.Validate;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -63,15 +63,16 @@ public class SharedTsBlockQueueTest {
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
- // reserve() returns (manualFuture, false) — simulating memory blocked
+ // reserveWithPriority() returns blocked future and reserve failure.
Mockito.when(
- mockMemoryPool.reserve(
+ mockMemoryPool.reserveWithPriority(
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(),
- Mockito.anyLong()))
- .thenReturn(new Pair<>(manualFuture, Boolean.FALSE));
+ Mockito.anyLong(),
+ Mockito.anyBoolean()))
+ .thenReturn(new MemoryReservationResult(manualFuture, false, 1024L));
// tryCancel returns 0 — simulating future already completed (can't cancel)
Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
index 2b5d7f176be..75d204acade 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
@@ -106,13 +106,14 @@ public class ShuffleSinkHandleTest {
Assert.assertFalse(localSinkChannel.isFinished());
Assert.assertEquals(11 * mockTsBlockSize,
localSinkChannel.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
- Long.MAX_VALUE);
+ Long.MAX_VALUE,
+ false);
// Abort.
shuffleSinkHandle.abort();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
index 85d1abefb0c..66d50675ddf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
@@ -259,13 +259,14 @@ public class SourceHandleTest {
.collect(Collectors.toList()));
try {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
localFragmentInstanceId),
localPlanNodeId,
MOCK_TSBLOCK_SIZE,
- maxBytesCanReserve);
+ maxBytesCanReserve,
+ false);
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.getDataBlock(
Mockito.argThat(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
index 327d4a34c39..b09498ad949 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
@@ -20,11 +20,11 @@
package org.apache.iotdb.db.queryengine.execution.exchange;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.TsBlockSerde;
-import org.apache.tsfile.utils.Pair;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
@@ -68,21 +68,25 @@ public class Utils {
settableFuture.get().set(null);
AtomicReference<Long> reservedBytes = new AtomicReference<>(0L);
Mockito.when(
- mockMemoryPool.reserve(
+ mockMemoryPool.reserveWithPriority(
Mockito.eq(queryId),
Mockito.eq(fragmentInstanceId),
Mockito.eq(planNodeId),
Mockito.anyLong(),
- Mockito.anyLong()))
+ Mockito.anyLong(),
+ Mockito.anyBoolean()))
.thenAnswer(
invocation -> {
long bytesToReserve = invocation.getArgument(3);
if (reservedBytes.get() + bytesToReserve <= capacityInBytes) {
- reservedBytes.updateAndGet(v -> v + (long)
invocation.getArgument(3));
- return new Pair<>(settableFuture.get(), true);
+ reservedBytes.updateAndGet(v -> v + bytesToReserve);
+ return new MemoryReservationResult(settableFuture.get(), true,
bytesToReserve);
} else {
+ if (invocation.getArgument(5)) {
+ return new MemoryReservationResult(settableFuture.get(),
true, 0L);
+ }
settableFuture.set(SettableFuture.create());
- return new Pair<>(settableFuture.get(), false);
+ return new MemoryReservationResult(settableFuture.get(),
false, bytesToReserve);
}
});
Mockito.doAnswer(
@@ -124,13 +128,17 @@ public class Utils {
public static MemoryPool createMockNonBlockedMemoryPool() {
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
Mockito.when(
- mockMemoryPool.reserve(
+ mockMemoryPool.reserveWithPriority(
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(),
- Mockito.anyLong()))
- .thenReturn(new Pair<>(immediateFuture(null), true));
+ Mockito.anyLong(),
+ Mockito.anyBoolean()))
+ .thenAnswer(
+ invocation ->
+ new MemoryReservationResult(
+ immediateFuture(null), true, invocation.getArgument(3)));
Mockito.when(
mockMemoryPool.tryReserve(
Mockito.anyString(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
index ebcf2abf4e3..8614d39f20a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.memory;
import org.apache.iotdb.commons.memory.MemoryManager;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
@@ -98,7 +99,9 @@ public class MemoryPoolTest {
public void testReserve() {
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(future.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
@@ -108,7 +111,8 @@ public class MemoryPoolTest {
public void tesReserveZero() {
try {
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L,
Long.MAX_VALUE);
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, Long.MAX_VALUE,
false);
Assert.fail("Expect IllegalArgumentException");
} catch (IllegalArgumentException ignore) {
}
@@ -118,7 +122,8 @@ public class MemoryPoolTest {
public void testReserveNegative() {
try {
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L,
Long.MAX_VALUE);
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, Long.MAX_VALUE,
false);
Assert.fail("Expect IllegalArgumentException");
} catch (IllegalArgumentException ignore) {
}
@@ -128,7 +133,9 @@ public class MemoryPoolTest {
public void testReserveAll() {
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(future.isDone());
Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(512L, pool.getReservedBytes());
@@ -138,11 +145,15 @@ public class MemoryPoolTest {
public void testOverReserve() {
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(future.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
- future = pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ future =
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
Assert.assertFalse(future.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
@@ -152,11 +163,13 @@ public class MemoryPoolTest {
public void testReserveAndFree() {
Assert.assertTrue(
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE)
- .left
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .getFuture()
.isDone());
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
Assert.assertFalse(future.isDone());
Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(512L, pool.getReservedBytes());
@@ -170,18 +183,22 @@ public class MemoryPoolTest {
public void testMultiReserveAndFree() {
Assert.assertTrue(
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE)
- .left
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture()
.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
ListenableFuture<Void> future1 =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
ListenableFuture<Void> future2 =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
ListenableFuture<Void> future3 =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
Assert.assertFalse(future1.isDone());
Assert.assertFalse(future2.isDone());
Assert.assertFalse(future3.isDone());
@@ -288,7 +305,8 @@ public class MemoryPoolTest {
pool.tryReserveForTest(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, Long.MAX_VALUE));
ListenableFuture<Void> f =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
512L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
256L, 512L, false)
+ .getFuture();
Assert.assertFalse(f.isDone());
// Cancel the reservation.
Assert.assertEquals(256L, pool.tryCancel(f));
@@ -300,11 +318,76 @@ public class MemoryPoolTest {
public void testTryCancelCompletedReservation() {
ListenableFuture<Void> f =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(f.isDone());
// Cancel the reservation.
Assert.assertEquals(0L, pool.tryCancel(f));
Assert.assertTrue(f.isDone());
Assert.assertFalse(f.isCancelled());
}
+
+ /**
+ * Normal query: requested bytes exceed what the pool can still provide —
reserve fails (blocked
+ * future, not immediate success).
+ */
+ @Test
+ public void testReserveWithPriorityNormalQueryExceedsAvailable() {
+ MemoryReservationResult r1 =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false);
+ Assert.assertTrue(r1.isReserveSuccess());
+ Assert.assertEquals(512L, r1.getReservedBytes());
+ Assert.assertTrue(r1.getFuture().isDone());
+
+ MemoryReservationResult r2 =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false);
+ Assert.assertTrue(r2.isReserveSuccess());
+ Assert.assertEquals(512L, r2.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+
+ MemoryReservationResult r3 =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false);
+ Assert.assertFalse(r3.isReserveSuccess());
+ Assert.assertEquals(256L, r3.getReservedBytes());
+ Assert.assertFalse(r3.getFuture().isDone());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+ }
+
+ /** SHOW QUERIES path: exceeds pool capacity — treated as success with zero
bytes from pool. */
+ @Test
+ public void testReserveWithPriorityShowQueriesExceedsAvailable() {
+ Assert.assertTrue(
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .isReserveSuccess());
+ Assert.assertTrue(
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .isReserveSuccess());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+
+ MemoryReservationResult r =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, true);
+ Assert.assertTrue(r.isReserveSuccess());
+ Assert.assertEquals(0L, r.getReservedBytes());
+ Assert.assertTrue(r.getFuture().isDone());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+ }
+
+ /** SHOW QUERIES path: pool has room — same as normal successful reserve. */
+ @Test
+ public void testReserveWithPriorityShowQueriesWithinAvailable() {
+ MemoryReservationResult r =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, true);
+ Assert.assertTrue(r.isReserveSuccess());
+ Assert.assertEquals(256L, r.getReservedBytes());
+ Assert.assertTrue(r.getFuture().isDone());
+ Assert.assertEquals(256L, pool.getReservedBytes());
+ }
}