This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b90962c2fb [IOTDB-4552] Fix NPE in SourceHandle (#7467)
b90962c2fb is described below
commit b90962c2fb006f75911cb76be774ec63a35d831e
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Sep 30 09:00:51 2022 +0800
[IOTDB-4552] Fix NPE in SourceHandle (#7467)
---
.../execution/exchange/MPPDataExchangeManager.java | 2 +-
.../db/mpp/execution/exchange/SourceHandle.java | 31 +++++++++++++++-------
2 files changed, 22 insertions(+), 11 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 628fe7ebb6..b9376021a5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -99,7 +99,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
try {
ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
resp.addToTsBlocks(serializedTsBlock);
- } catch (IOException e) {
+ } catch (IllegalStateException | IOException e) {
throw new TException(e);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index 0b4f2c4a11..bfd9a017ea 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -407,33 +407,44 @@ public class SourceHandle implements ISourceHandle {
}
break;
} catch (Throwable e) {
+
logger.error(
"failed to get data block [{}, {}), attempt times: {}",
startSequenceId,
endSequenceId,
attempt,
e);
+
+ // reach retry max times
if (attempt == MAX_ATTEMPT_TIMES) {
- synchronized (SourceHandle.this) {
- bufferRetainedSizeInBytes -= reservedBytes;
- localMemoryManager
- .getQueryPool()
- .free(localFragmentInstanceId.getQueryId(), reservedBytes);
- sourceHandleListener.onFailure(SourceHandle.this, e);
- }
+ fail(e);
+ return;
}
+
+ // sleep some time before retrying
try {
Thread.sleep(retryIntervalInMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
- synchronized (SourceHandle.this) {
- sourceHandleListener.onFailure(SourceHandle.this, e);
- }
+ // if interrupted during sleeping, fast fail and don't retry any
more
+ fail(e);
+ return;
}
}
}
}
}
+
+ private void fail(Throwable t) {
+ synchronized (SourceHandle.this) {
+ if (aborted || closed) {
+ return;
+ }
+ bufferRetainedSizeInBytes -= reservedBytes;
+
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(),
reservedBytes);
+ sourceHandleListener.onFailure(SourceHandle.this, t);
+ }
+ }
}
class SendAcknowledgeDataBlockEventTask implements Runnable {