This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e16cf90c1e [IOTDB-3677] Fix NPE while calling
QueryExecution.getBatchResult (#6479)
e16cf90c1e is described below
commit e16cf90c1eda431066ea3203b4bd471fecd3c804
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 28 14:09:01 2022 +0800
[IOTDB-3677] Fix NPE while calling QueryExecution.getBatchResult (#6479)
---
.../db/mpp/plan/execution/QueryExecution.java | 54 +++++++++++++---------
1 file changed, 31 insertions(+), 23 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 7a9ab53fa4..88c17bd4d0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -281,30 +281,38 @@ public class QueryExecution implements IQueryExecution {
*/
@Override
public Optional<TsBlock> getBatchResult() {
- try {
- if (resultHandle == null || resultHandle.isAborted() ||
resultHandle.isFinished()) {
- // Once the resultHandle is finished, we should transit the state of
this query to FINISHED.
- // So that the corresponding cleanup work could be triggered.
- logger.info("resultHandle for client is finished");
- stateMachine.transitionToFinished();
- return Optional.empty();
- }
- ListenableFuture<?> blocked = resultHandle.isBlocked();
- blocked.get();
- if (!resultHandle.isFinished()) {
- return Optional.of(resultHandle.receive());
- } else {
- return Optional.empty();
+ // iterate until we get a non-nullable TsBlock or result is finished
+ while (true) {
+ try {
+ if (resultHandle == null || resultHandle.isAborted() ||
resultHandle.isFinished()) {
+ // Once the resultHandle is finished, we should transit the state of
this query to
+ // FINISHED.
+ // So that the corresponding cleanup work could be triggered.
+ logger.info("resultHandle for client is finished");
+ stateMachine.transitionToFinished();
+ return Optional.empty();
+ }
+ ListenableFuture<?> blocked = resultHandle.isBlocked();
+ blocked.get();
+ if (!resultHandle.isFinished()) {
+ TsBlock res = resultHandle.receive();
+ if (res == null) {
+ continue;
+ }
+ return Optional.of(res);
+ } else {
+ return Optional.empty();
+ }
+ } catch (ExecutionException | CancellationException e) {
+ stateMachine.transitionToFailed(e);
+ Throwable t = e.getCause() == null ? e : e.getCause();
+ throwIfUnchecked(t);
+ throw new RuntimeException(t);
+ } catch (InterruptedException e) {
+ stateMachine.transitionToFailed(e);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(new SQLException("ResultSet thread was
interrupted", e));
}
- } catch (ExecutionException | CancellationException e) {
- stateMachine.transitionToFailed(e);
- Throwable t = e.getCause() == null ? e : e.getCause();
- throwIfUnchecked(t);
- throw new RuntimeException(t);
- } catch (InterruptedException e) {
- stateMachine.transitionToFailed(e);
- Thread.currentThread().interrupt();
- throw new RuntimeException(new SQLException("ResultSet thread was
interrupted", e));
}
}