This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ResultHandleNPE
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 632c52d2431e2dec8ee23f92fbe518967faf084b
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Apr 27 19:07:53 2023 +0800

    Avoid Result Handle clean up twice which will cause NPE
---
 .../main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java | 2 +-
 .../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java     | 7 ++++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c3be5e1bbf..f92650bcde 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -125,7 +125,7 @@ public abstract class Driver implements IDriver {
         tryWithLock(
             100,
             TimeUnit.MILLISECONDS,
-            true,
+            false,
             () -> {
               // only keep doing query processing if driver state is still 
alive
               if (state.get() == State.ALIVE) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 199329aa93..715cf29c71 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -126,6 +126,9 @@ public class QueryExecution implements IQueryExecution {
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
+  // used for cleaning resultHandle up exactly once
+  private final AtomicBoolean resultHandleCleanUp;
+
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       syncInternalServiceClientManager;
 
@@ -186,6 +189,7 @@ public class QueryExecution implements IQueryExecution {
           }
         });
     this.stopped = new AtomicBoolean(false);
+    this.resultHandleCleanUp = new AtomicBoolean(false);
   }
 
   @FunctionalInterface
@@ -259,6 +263,7 @@ public class QueryExecution implements IQueryExecution {
     context.prepareForRetry();
     // re-stop
     this.stopped.compareAndSet(true, false);
+    this.resultHandleCleanUp.compareAndSet(true, false);
     // re-analyze the query
     this.analysis = analyze(rawStatement, context, partitionFetcher, 
schemaFetcher);
     // re-start the QueryExecution
@@ -407,7 +412,7 @@ public class QueryExecution implements IQueryExecution {
     // We don't need to deal with MemorySourceHandle because it doesn't 
register to memory pool
     // We don't need to deal with LocalSourceHandle because the 
SharedTsBlockQueue uses the upstream
     // FragmentInstanceId to register
-    if (resultHandle instanceof SourceHandle) {
+    if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle 
instanceof SourceHandle) {
       TFragmentInstanceId fragmentInstanceId = 
resultHandle.getLocalFragmentInstanceId();
       MPPDataExchangeService.getInstance()
           .getMPPDataExchangeManager()

Reply via email to