This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ResultHandleNPE in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 632c52d2431e2dec8ee23f92fbe518967faf084b Author: JackieTien97 <[email protected]> AuthorDate: Thu Apr 27 19:07:53 2023 +0800 Avoid Result Handle clean up twice which will cause NPE --- .../main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 2 +- .../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java index c3be5e1bbf..f92650bcde 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java @@ -125,7 +125,7 @@ public abstract class Driver implements IDriver { tryWithLock( 100, TimeUnit.MILLISECONDS, - true, + false, () -> { // only keep doing query processing if driver state is still alive if (state.get() == State.ALIVE) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 199329aa93..715cf29c71 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -126,6 +126,9 @@ public class QueryExecution implements IQueryExecution { // We use this SourceHandle to fetch the TsBlock from it. private ISourceHandle resultHandle; + // used for cleaning resultHandle up exactly once + private final AtomicBoolean resultHandleCleanUp; + private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager; @@ -186,6 +189,7 @@ public class QueryExecution implements IQueryExecution { } }); this.stopped = new AtomicBoolean(false); + this.resultHandleCleanUp = new AtomicBoolean(false); } @FunctionalInterface @@ -259,6 +263,7 @@ public class QueryExecution implements IQueryExecution { context.prepareForRetry(); // re-stop this.stopped.compareAndSet(true, false); + this.resultHandleCleanUp.compareAndSet(true, false); // re-analyze the query this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher); // re-start the QueryExecution @@ -407,7 +412,7 @@ public class QueryExecution implements IQueryExecution { // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream // FragmentInstanceId to register - if (resultHandle instanceof SourceHandle) { + if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle instanceof SourceHandle) { TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); MPPDataExchangeService.getInstance() .getMPPDataExchangeManager()
