This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-3677 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3c941a4dcb8403b311eac89e04d5352c4f977bf8 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jun 28 11:54:30 2022 +0800 [IOTDB-3677] Fix NPE while calling QueryExecution.getBatchResult --- .../db/mpp/plan/execution/QueryExecution.java | 54 +++++++++++++--------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 7a9ab53fa4..88c17bd4d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -281,30 +281,38 @@ public class QueryExecution implements IQueryExecution { */ @Override public Optional<TsBlock> getBatchResult() { - try { - if (resultHandle == null || resultHandle.isAborted() || resultHandle.isFinished()) { - // Once the resultHandle is finished, we should transit the state of this query to FINISHED. - // So that the corresponding cleanup work could be triggered. - logger.info("resultHandle for client is finished"); - stateMachine.transitionToFinished(); - return Optional.empty(); - } - ListenableFuture<?> blocked = resultHandle.isBlocked(); - blocked.get(); - if (!resultHandle.isFinished()) { - return Optional.of(resultHandle.receive()); - } else { - return Optional.empty(); + // iterate until we get a non-nullable TsBlock or result is finished + while (true) { + try { + if (resultHandle == null || resultHandle.isAborted() || resultHandle.isFinished()) { + // Once the resultHandle is finished, we should transit the state of this query to + // FINISHED. + // So that the corresponding cleanup work could be triggered. + logger.info("resultHandle for client is finished"); + stateMachine.transitionToFinished(); + return Optional.empty(); + } + ListenableFuture<?> blocked = resultHandle.isBlocked(); + blocked.get(); + if (!resultHandle.isFinished()) { + TsBlock res = resultHandle.receive(); + if (res == null) { + continue; + } + return Optional.of(res); + } else { + return Optional.empty(); + } + } catch (ExecutionException | CancellationException e) { + stateMachine.transitionToFailed(e); + Throwable t = e.getCause() == null ? e : e.getCause(); + throwIfUnchecked(t); + throw new RuntimeException(t); + } catch (InterruptedException e) { + stateMachine.transitionToFailed(e); + Thread.currentThread().interrupt(); + throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e)); } - } catch (ExecutionException | CancellationException e) { - stateMachine.transitionToFailed(e); - Throwable t = e.getCause() == null ? e : e.getCause(); - throwIfUnchecked(t); - throw new RuntimeException(t); - } catch (InterruptedException e) { - stateMachine.transitionToFailed(e); - Thread.currentThread().interrupt(); - throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e)); } }
