This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch RecordBlockedQuery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 98753918218c9662dc5f66a9472985fb89fdc4ca Author: JackieTien97 <[email protected]> AuthorDate: Thu Jun 26 16:06:32 2025 +0800 Record release resource blocked too long by driver --- .../queryengine/execution/QueryStateMachine.java | 58 ++++++++++++---------- .../fragment/FragmentInstanceContext.java | 7 +++ 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java index a6e3acfb44d..cc0f787b014 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java @@ -29,6 +29,7 @@ import org.apache.iotdb.rpc.RpcUtils; import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -50,8 +51,8 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause; public class QueryStateMachine { private final StateMachine<QueryState> queryState; - private Throwable failureException; - private TSStatus failureStatus; + private final AtomicReference<Throwable> failureException = new AtomicReference<>(); + private final AtomicReference<TSStatus> failureStatus = new AtomicReference<>(); public QueryStateMachine(QueryId queryId, ExecutorService executor) { this.queryState = @@ -74,6 +75,8 @@ public class QueryStateMachine { public void transitionToQueued() { queryState.set(QUEUED); + failureException.set(null); + failureStatus.set(null); } public void transitionToPlanned() { @@ -85,7 +88,7 @@ public class QueryStateMachine { } public void transitionToPendingRetry(TSStatus failureStatus) { - this.failureStatus = failureStatus; + this.failureStatus.compareAndSet(null, failureStatus); queryState.setIf(PENDING_RETRY, currentState -> currentState == DISPATCHING); } @@ -105,10 +108,9 @@ public class QueryStateMachine { } public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) { - if (transitionToDoneState(CANCELED)) { - this.failureException = throwable; - this.failureStatus = failureStatus; - } + this.failureStatus.compareAndSet(null, failureStatus); + this.failureException.compareAndSet(null, throwable); + transitionToDoneState(CANCELED); } public void transitionToAborted() { @@ -120,15 +122,13 @@ public class QueryStateMachine { } public void transitionToFailed(Throwable throwable) { - if (transitionToDoneState(FAILED)) { - this.failureException = throwable; - } + this.failureException.compareAndSet(null, throwable); + transitionToDoneState(FAILED); } public void transitionToFailed(TSStatus failureStatus) { - if (transitionToDoneState(FAILED)) { - this.failureStatus = failureStatus; - } + this.failureStatus.compareAndSet(null, failureStatus); + transitionToDoneState(FAILED); } private boolean transitionToDoneState(QueryState doneState) { @@ -139,31 +139,37 @@ public class QueryStateMachine { } public String getFailureMessage() { - if (failureException != null) { - return failureException.getMessage(); + Throwable throwable = failureException.get(); + if (throwable != null) { + return throwable.getMessage(); } return "no detailed failure reason in QueryStateMachine"; } public Throwable getFailureException() { - if (failureException == null) { + Throwable throwable = failureException.get(); + if (throwable == null) { return new IoTDBException(getFailureStatus().getMessage(), getFailureStatus().code); } else { - return failureException; + return throwable; } } public TSStatus getFailureStatus() { - if (failureStatus != null) { - return failureStatus; - } else if (failureException != null) { - Throwable t = getRootCause(failureException); - if (t instanceof IoTDBRuntimeException) { - return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(), t.getMessage()); - } else if (t instanceof IoTDBException) { - return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), t.getMessage()); + TSStatus status = failureStatus.get(); + if (status != null) { + return status; + } else { + Throwable throwable = failureException.get(); + if (throwable != null) { + Throwable t = getRootCause(throwable); + if (t instanceof IoTDBRuntimeException) { + return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(), t.getMessage()); + } else if (t instanceof IoTDBException) { + return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), t.getMessage()); + } } + return failureStatus.get(); } - return failureStatus; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 55df223db17..8b9a2a274a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -78,6 +78,8 @@ public class FragmentInstanceContext extends QueryContext { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class); private static final long END_TIME_INITIAL_VALUE = -1L; + // wait over 5s for driver to close is abnormal + private static final long LONG_WAIT_DURATION = 5_000_000_000L; private final FragmentInstanceId id; private final FragmentInstanceStateMachine stateMachine; @@ -708,6 +710,7 @@ public class FragmentInstanceContext extends QueryContext { @SuppressWarnings("squid:S2142") public void releaseResourceWhenAllDriversAreClosed() { + long startTime = System.nanoTime(); while (true) { try { allDriversClosed.await(); @@ -718,6 +721,10 @@ public class FragmentInstanceContext extends QueryContext { "Interrupted when await on allDriversClosed, FragmentInstance Id is {}", this.getId()); } } + long duration = System.nanoTime() - startTime; + if (duration >= LONG_WAIT_DURATION) { + LOGGER.warn("Wait {}ms for all Drivers closed", duration / 1_000_000); + } releaseResource(); }
