This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3213c52114b [opt](cancel) Cancel get result future immediately if
query is cancelled (#31228)
3213c52114b is described below
commit 3213c52114b12e48eb08827264df69a90527d092
Author: zhiqiang <[email protected]>
AuthorDate: Fri Feb 23 01:37:50 2024 +0800
[opt](cancel) Cancel get result future immediately if query is cancelled
(#31228)
---
.../main/java/org/apache/doris/common/Status.java | 1 +
.../main/java/org/apache/doris/qe/Coordinator.java | 7 ++--
.../java/org/apache/doris/qe/ResultReceiver.java | 47 ++++++++++++++++++++--
3 files changed, 48 insertions(+), 7 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 5a7c1e9d63d..1961f9b8cc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -24,6 +24,7 @@ import org.apache.doris.thrift.TStatusCode;
public class Status {
public static final Status OK = new Status();
public static final Status CANCELLED = new Status(TStatusCode.CANCELLED,
"Cancelled");
+ public static final Status TIMEOUT = new Status(TStatusCode.TIMEOUT,
"Timeout");
public TStatusCode getErrorCode() {
return errorCode;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 1093600a485..799c687ea0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1330,7 +1330,8 @@ public class Coordinator implements CoordInterface {
Status status = new Status();
resultBatch = receiver.getNext(status);
if (!status.ok()) {
- LOG.warn("get next fail, need cancel. query id: {}",
DebugUtil.printId(queryId));
+ LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
+ DebugUtil.printId(queryId), status.toString());
}
updateStatus(status, null /* no instance id */);
@@ -1478,7 +1479,7 @@ public class Coordinator implements CoordInterface {
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
if (null != receiver) {
- receiver.cancel();
+ receiver.cancel(cancelReason.toString());
}
if (null != pointExec) {
pointExec.cancel();
@@ -1490,7 +1491,7 @@ public class Coordinator implements CoordInterface {
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason,
long backendId) {
if (null != receiver) {
- receiver.cancel();
+ receiver.cancel(cancelReason.toString());
}
if (null != pointExec) {
pointExec.cancel();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 1734d3fcfb4..a9e9740963f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -33,6 +33,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -49,6 +50,8 @@ public class ResultReceiver {
private Types.PUniqueId finstId;
private Long backendId;
private Thread currentThread;
+ private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture =
null;
+ public String cancelReason = "";
public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId,
TNetworkAddress address, long timeoutTs) {
this.queryId =
Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
@@ -71,25 +74,43 @@ public class ResultReceiver {
.build();
currentThread = Thread.currentThread();
- Future<InternalService.PFetchDataResult> future
+ fetchDataAsyncFuture
=
BackendServiceProxy.getInstance().fetchDataAsync(address, request);
InternalService.PFetchDataResult pResult = null;
+
while (pResult == null) {
long currentTs = System.currentTimeMillis();
if (currentTs >= timeoutTs) {
throw new TimeoutException("query timeout, query id =
" + DebugUtil.printId(this.queryId));
}
try {
- pResult = future.get(timeoutTs - currentTs,
TimeUnit.MILLISECONDS);
+ pResult = fetchDataAsyncFuture.get(timeoutTs -
currentTs, TimeUnit.MILLISECONDS);
+ } catch (CancellationException e) {
+ LOG.warn("Future of ResultReceiver of query {} is
cancelled", DebugUtil.printId(this.queryId));
+ if (!isCancel) {
+ LOG.warn("ResultReceiver is not set to cancelled
state, this should not happen");
+ } else {
+ status.setStatus(new Status(TStatusCode.CANCELLED,
this.cancelReason));
+ return null;
+ }
+ } catch (TimeoutException e) {
+ LOG.warn("Query {} get result timeout, get result
duration {} ms",
+ DebugUtil.printId(this.queryId), (timeoutTs -
currentTs) / 1000);
+ isCancel = true;
+ status.setStatus(Status.TIMEOUT);
+ updateCancelReason("fetch data timeout");
+ return null;
} catch (InterruptedException e) {
// continue to get result
- LOG.info("future get interrupted Exception", e);
+ LOG.warn("Future of ResultReceiver of query {} got
interrupted Exception",
+ DebugUtil.printId(this.queryId), e);
if (isCancel) {
status.setStatus(Status.CANCELLED);
return null;
}
}
}
+
TStatusCode code =
TStatusCode.findByValue(pResult.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
status.setPstatus(pResult.getStatus());
@@ -150,8 +171,18 @@ public class ResultReceiver {
return rowBatch;
}
- public void cancel() {
+ private void updateCancelReason(String reason) {
+ if (this.cancelReason.isEmpty()) {
+ this.cancelReason = reason;
+ } else {
+ LOG.warn("Query {} already has cancel reason: {}, new reason {}
will be ignored",
+ DebugUtil.printId(queryId), cancelReason, reason);
+ }
+ }
+
+ public void cancel(String reason) {
isCancel = true;
+ updateCancelReason(reason);
synchronized (this) {
if (currentThread != null) {
// TODO(cmy): we cannot interrupt this thread, or we may throw
@@ -160,6 +191,14 @@ public class ResultReceiver {
// And user will lost connection to Palo
// currentThread.interrupt();
}
+ if (fetchDataAsyncFuture != null) {
+ if (fetchDataAsyncFuture.cancel(true)) {
+ LOG.info("ResultReceiver of query {} is cancelled",
DebugUtil.printId(queryId));
+ } else {
+ LOG.warn("ResultReceiver of query {} cancel failed,
typically means the future is finished",
+ DebugUtil.printId(queryId));
+ }
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]