This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ff1f1781dee Try to fix error msg like: 301: queue has been destroyed
ff1f1781dee is described below
commit ff1f1781dee057a9cd061598d067b343a9662ad8
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Oct 9 14:49:14 2024 +0800
Try to fix error msg like: 301: queue has been destroyed
---
.../queryengine/execution/exchange/SharedTsBlockQueue.java | 10 ++++++++++
.../execution/exchange/source/LocalSourceHandle.java | 12 ++++++++++++
2 files changed, 22 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 6c556c1ae55..b3b94f66282 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -38,6 +38,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.util.LinkedList;
import java.util.Queue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
@@ -177,6 +178,15 @@ public class SharedTsBlockQueue {
*/
public TsBlock remove() {
if (closed) {
+ // try throw underlying exception instead of "Source handle is aborted."
+ try {
+ blocked.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e.getCause() == null ? e :
e.getCause());
+ }
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 315e75cdfc5..4da89f72d07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
import static
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -254,6 +255,17 @@ public class LocalSourceHandle implements ISourceHandle {
private void checkState() {
if (aborted) {
+ if (queue.isBlocked().isDone()) {
+ // try throw underlying exception instead of "Source handle is
aborted."
+ try {
+ queue.isBlocked().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e.getCause() == null ? e :
e.getCause());
+ }
+ }
throw new IllegalStateException("Source handle is aborted.");
} else if (closed) {
throw new IllegalStateException("Source Handle is closed.");