This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b90962c2fb [IOTDB-4552] Fix NPE in SourceHandle (#7467)
b90962c2fb is described below

commit b90962c2fb006f75911cb76be774ec63a35d831e
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Sep 30 09:00:51 2022 +0800

    [IOTDB-4552] Fix NPE in SourceHandle (#7467)
---
 .../execution/exchange/MPPDataExchangeManager.java |  2 +-
 .../db/mpp/execution/exchange/SourceHandle.java    | 31 +++++++++++++++-------
 2 files changed, 22 insertions(+), 11 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 628fe7ebb6..b9376021a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -99,7 +99,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
           try {
             ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
             resp.addToTsBlocks(serializedTsBlock);
-          } catch (IOException e) {
+          } catch (IllegalStateException | IOException e) {
             throw new TException(e);
           }
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 0b4f2c4a11..bfd9a017ea 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -407,33 +407,44 @@ public class SourceHandle implements ISourceHandle {
             }
             break;
           } catch (Throwable e) {
+
             logger.error(
                 "failed to get data block [{}, {}), attempt times: {}",
                 startSequenceId,
                 endSequenceId,
                 attempt,
                 e);
+
+            // reach retry max times
             if (attempt == MAX_ATTEMPT_TIMES) {
-              synchronized (SourceHandle.this) {
-                bufferRetainedSizeInBytes -= reservedBytes;
-                localMemoryManager
-                    .getQueryPool()
-                    .free(localFragmentInstanceId.getQueryId(), reservedBytes);
-                sourceHandleListener.onFailure(SourceHandle.this, e);
-              }
+              fail(e);
+              return;
             }
+
+            // sleep some time before retrying
             try {
               Thread.sleep(retryIntervalInMs);
             } catch (InterruptedException ex) {
               Thread.currentThread().interrupt();
-              synchronized (SourceHandle.this) {
-                sourceHandleListener.onFailure(SourceHandle.this, e);
-              }
+              // if interrupted during sleeping, fast fail and don't retry any 
more
+              fail(e);
+              return;
             }
           }
         }
       }
     }
+
+    private void fail(Throwable t) {
+      synchronized (SourceHandle.this) {
+        if (aborted || closed) {
+          return;
+        }
+        bufferRetainedSizeInBytes -= reservedBytes;
+        
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), 
reservedBytes);
+        sourceHandleListener.onFailure(SourceHandle.this, t);
+      }
+    }
   }
 
   class SendAcknowledgeDataBlockEventTask implements Runnable {

Reply via email to