This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dddedf06260 Fix potential NPE when concurrently aborting a Query
dddedf06260 is described below
commit dddedf0626052132d0e74054c129ec16e3a3bb6a
Author: Liao Lanyu <[email protected]>
AuthorDate: Wed Apr 10 14:44:34 2024 +0800
Fix potential NPE when concurrently aborting a Query
---
.../execution/exchange/sink/ShuffleSinkHandle.java | 4 ++--
.../execution/exchange/source/LocalSourceHandle.java | 9 ---------
.../iotdb/db/queryengine/execution/memory/MemoryPool.java | 14 +++++++-------
3 files changed, 9 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
index 7c31ab67e04..b9bf277b68b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
@@ -188,7 +188,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
if (aborted || closed) {
return;
}
- aborted = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortShuffleSinkHandle]");
}
@@ -208,6 +207,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
LOGGER.warn("Error occurred when try to abort channel.", firstException);
}
sinkListener.onAborted(this);
+ aborted = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortShuffleSinkHandle]");
}
@@ -222,7 +222,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
if (closed || aborted) {
return;
}
- closed = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseShuffleSinkHandle]");
}
@@ -242,6 +241,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
LOGGER.warn("Error occurred when try to close channel.", firstException);
}
sinkListener.onFinish(this);
+ closed = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndCloseShuffleSinkHandle]");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index de37c64e460..7c6b56fd5ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -183,9 +183,6 @@ public class LocalSourceHandle implements ISourceHandle {
@Override
public void abort() {
- if (aborted || closed) {
- return;
- }
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortLocalSourceHandle]");
@@ -208,9 +205,6 @@ public class LocalSourceHandle implements ISourceHandle {
@Override
public void abort(Throwable t) {
- if (aborted || closed) {
- return;
- }
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortLocalSourceHandle]");
@@ -233,9 +227,6 @@ public class LocalSourceHandle implements ISourceHandle {
@Override
public void close() {
- if (aborted || closed) {
- return;
- }
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseLocalSourceHandle]");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 3b8adc70d44..c43ba99c63a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -223,9 +223,9 @@ public class MemoryPool {
String planNodeId,
long bytesToReserve,
long maxBytesCanReserve) {
- Validate.notNull(queryId);
- Validate.notNull(fragmentInstanceId);
- Validate.notNull(planNodeId);
+ Validate.notNull(queryId, "queryId can not be null.");
+ Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be
null.");
+ Validate.notNull(planNodeId, "planNodeId can not be null.");
Validate.isTrue(
bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
"bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
@@ -263,9 +263,9 @@ public class MemoryPool {
String planNodeId,
long bytesToReserve,
long maxBytesCanReserve) {
- Validate.notNull(queryId);
- Validate.notNull(fragmentInstanceId);
- Validate.notNull(planNodeId);
+ Validate.notNull(queryId, "queryId can not be null.");
+ Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be
null.");
+ Validate.notNull(planNodeId, "planNodeId can not be null.");
Validate.isTrue(
bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
"bytesToReserve should be in (0,maxBytesPerFI]. maxBytesPerFI: %d",
@@ -288,10 +288,10 @@ public class MemoryPool {
*/
@SuppressWarnings("squid:S2445")
public synchronized long tryCancel(ListenableFuture<Void> future) {
+ Validate.notNull(future, "The future to be cancelled can not be null.");
// add synchronized on the future to avoid that the future is concurrently
completed by
// MemoryPool.free() which may lead to memory leak.
synchronized (future) {
- Validate.notNull(future, "The future to be cancelled can not be null.");
// If the future is not a MemoryReservationFuture, it must have been
completed.
if (future.isDone()) {
return 0L;