This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 53ea2a0a1d1 [fix](coordinator) fix NereidsCoordinator can not
interrupt query in fe (#44795)
53ea2a0a1d1 is described below
commit 53ea2a0a1d18900a3ad5bca8d1b1d5fe1a98111c
Author: 924060929 <[email protected]>
AuthorDate: Mon Dec 2 16:40:38 2024 +0800
[fix](coordinator) fix NereidsCoordinator can not interrupt query in fe
(#44795)
fix NereidsCoordinator can not interrupt query in fe, the QueryProcessor
should set the status to failed
---
.../org/apache/doris/qe/AbstractJobProcessor.java | 118 +++++++++++++++++++++
.../java/org/apache/doris/qe/JobProcessor.java | 5 +-
.../org/apache/doris/qe/NereidsCoordinator.java | 5 +-
.../org/apache/doris/qe/runtime/LoadProcessor.java | 74 ++-----------
.../doris/qe/runtime/PipelineExecutionTask.java | 2 +-
.../qe/runtime/PipelineExecutionTaskBuilder.java | 2 +-
.../apache/doris/qe/runtime/QueryProcessor.java | 18 ++--
7 files changed, 142 insertions(+), 82 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
new file mode 100644
index 00000000000..2858de25d57
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AbstractJobProcessor.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.qe.runtime.BackendFragmentId;
+import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask;
+import org.apache.doris.qe.runtime.PipelineExecutionTask;
+import org.apache.doris.qe.runtime.SingleFragmentPipelineTask;
+import org.apache.doris.thrift.TReportExecStatusParams;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+
+/** AbstractJobProcessor */
+public abstract class AbstractJobProcessor implements JobProcessor {
+ private final Logger logger = LogManager.getLogger(getClass());
+
+ protected final CoordinatorContext coordinatorContext;
+ protected volatile Optional<PipelineExecutionTask> executionTask;
+ protected volatile Optional<Map<BackendFragmentId,
SingleFragmentPipelineTask>> backendFragmentTasks;
+
+ public AbstractJobProcessor(CoordinatorContext coordinatorContext) {
+ this.coordinatorContext = Objects.requireNonNull(coordinatorContext,
"coordinatorContext can not be null");
+ this.executionTask = Optional.empty();
+ this.backendFragmentTasks = Optional.empty();
+ }
+
+ protected abstract void doProcessReportExecStatus(
+ TReportExecStatusParams params, SingleFragmentPipelineTask
fragmentTask);
+
+ @Override
+ public final void setPipelineExecutionTask(PipelineExecutionTask
pipelineExecutionTask) {
+ Preconditions.checkArgument(pipelineExecutionTask != null,
"sqlPipelineTask can not be null");
+
+ this.executionTask = Optional.of(pipelineExecutionTask);
+ Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
+ = buildBackendFragmentTasks(pipelineExecutionTask);
+ this.backendFragmentTasks = Optional.of(backendFragmentTasks);
+
+ afterSetPipelineExecutionTask(pipelineExecutionTask);
+ }
+
+ protected void afterSetPipelineExecutionTask(PipelineExecutionTask
pipelineExecutionTask) {}
+
+ @Override
+ public final void updateFragmentExecStatus(TReportExecStatusParams params)
{
+ SingleFragmentPipelineTask fragmentTask =
backendFragmentTasks.get().get(
+ new BackendFragmentId(params.getBackendId(),
params.getFragmentId()));
+ if (fragmentTask == null ||
!fragmentTask.processReportExecStatus(params)) {
+ return;
+ }
+
+ TUniqueId queryId = coordinatorContext.queryId;
+ Status status = new Status(params.status);
+ // for now, abort the query if we see any error except if the error is
cancelled
+ // and returned_all_results_ is true.
+ // (UpdateStatus() initiates cancellation, if it hasn't already been
initiated)
+ if (!status.ok()) {
+ if (coordinatorContext.isEos() && status.isCancelled()) {
+ logger.warn("Query {} has returned all results, fragment_id={}
instance_id={}, be={}"
+ + " is reporting failed status {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(),
+ status.toString());
+ } else {
+ logger.warn("one instance report fail, query_id={}
fragment_id={} instance_id={}, be={},"
+ + " error message: {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(), status.toString());
+ coordinatorContext.updateStatusIfOk(status);
+ }
+ }
+ doProcessReportExecStatus(params, fragmentTask);
+ }
+
+ private Map<BackendFragmentId, SingleFragmentPipelineTask>
buildBackendFragmentTasks(
+ PipelineExecutionTask executionTask) {
+ ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask>
backendFragmentTasks
+ = ImmutableMap.builder();
+ for (Entry<Long, MultiFragmentsPipelineTask> backendTask :
executionTask.getChildrenTasks().entrySet()) {
+ Long backendId = backendTask.getKey();
+ for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask :
backendTask.getValue()
+ .getChildrenTasks().entrySet()) {
+ Integer fragmentId = fragmentIdToTask.getKey();
+ SingleFragmentPipelineTask fragmentTask =
fragmentIdToTask.getValue();
+ backendFragmentTasks.put(new BackendFragmentId(backendId,
fragmentId), fragmentTask);
+ }
+ }
+ return backendFragmentTasks.build();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java
index ede218848c7..7e4042dde3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/JobProcessor.java
@@ -19,9 +19,12 @@ package org.apache.doris.qe;
import org.apache.doris.common.Status;
import org.apache.doris.qe.runtime.PipelineExecutionTask;
+import org.apache.doris.thrift.TReportExecStatusParams;
public interface JobProcessor {
- void setSqlPipelineTask(PipelineExecutionTask pipelineExecutionTask);
+ void setPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask);
void cancel(Status cancelReason);
+
+ void updateFragmentExecStatus(TReportExecStatusParams params);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index a9d6becc7fa..a6f24806ed7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -233,10 +233,7 @@ public class NereidsCoordinator extends Coordinator {
@Override
public void updateFragmentExecStatus(TReportExecStatusParams params) {
- JobProcessor jobProcessor = coordinatorContext.getJobProcessor();
- if (jobProcessor instanceof LoadProcessor) {
-
coordinatorContext.asLoadProcessor().updateFragmentExecStatus(params);
- }
+ coordinatorContext.getJobProcessor().updateFragmentExecStatus(params);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
index 3a448521fca..fb32919d834 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java
@@ -24,46 +24,39 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.AbstractJobProcessor;
import org.apache.doris.qe.CoordinatorContext;
-import org.apache.doris.qe.JobProcessor;
import org.apache.doris.qe.LoadContext;
import org.apache.doris.thrift.TFragmentInstanceReport;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-public class LoadProcessor implements JobProcessor {
+public class LoadProcessor extends AbstractJobProcessor {
private static final Logger LOG =
LogManager.getLogger(LoadProcessor.class);
- public final CoordinatorContext coordinatorContext;
public final LoadContext loadContext;
public final long jobId;
// this latch is used to wait finish for load, for example, insert into
statement
// MarkedCountDownLatch:
// key: fragmentId, value: backendId
- private volatile Optional<PipelineExecutionTask> executionTask;
private volatile Optional<MarkedCountDownLatch<Integer, Long>> latch;
- private volatile Optional<Map<BackendFragmentId,
SingleFragmentPipelineTask>> backendFragmentTasks;
private volatile List<SingleFragmentPipelineTask> topFragmentTasks;
public LoadProcessor(CoordinatorContext coordinatorContext, long jobId) {
- this.coordinatorContext = Objects.requireNonNull(coordinatorContext,
"coordinatorContext can not be null");
+ super(coordinatorContext);
+
this.loadContext = new LoadContext();
- this.executionTask = Optional.empty();
this.latch = Optional.empty();
this.backendFragmentTasks = Optional.empty();
@@ -87,14 +80,8 @@ public class LoadProcessor implements JobProcessor {
}
@Override
- public void setSqlPipelineTask(PipelineExecutionTask
pipelineExecutionTask) {
- Preconditions.checkArgument(pipelineExecutionTask != null,
"sqlPipelineTask can not be null");
-
- this.executionTask = Optional.of(pipelineExecutionTask);
- Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
- = buildBackendFragmentTasks(pipelineExecutionTask);
- this.backendFragmentTasks = Optional.of(backendFragmentTasks);
-
+ protected void afterSetPipelineExecutionTask(PipelineExecutionTask
pipelineExecutionTask) {
+ Map<BackendFragmentId, SingleFragmentPipelineTask>
backendFragmentTasks = this.backendFragmentTasks.get();
MarkedCountDownLatch<Integer, Long> latch = new
MarkedCountDownLatch<>(backendFragmentTasks.size());
for (BackendFragmentId backendFragmentId :
backendFragmentTasks.keySet()) {
latch.addMark(backendFragmentId.fragmentId,
backendFragmentId.backendId);
@@ -168,34 +155,9 @@ public class LoadProcessor implements JobProcessor {
return latch.get().await(timeout, unit);
}
- public void updateFragmentExecStatus(TReportExecStatusParams params) {
- SingleFragmentPipelineTask fragmentTask =
backendFragmentTasks.get().get(
- new BackendFragmentId(params.getBackendId(),
params.getFragmentId()));
- if (fragmentTask == null ||
!fragmentTask.processReportExecStatus(params)) {
- return;
- }
- TUniqueId queryId = coordinatorContext.queryId;
- Status status = new Status(params.status);
- // for now, abort the query if we see any error except if the error is
cancelled
- // and returned_all_results_ is true.
- // (UpdateStatus() initiates cancellation, if it hasn't already been
initiated)
- if (!status.ok()) {
- if (coordinatorContext.isEos() && status.isCancelled()) {
- LOG.warn("Query {} has returned all results, fragment_id={}
instance_id={}, be={}"
- + " is reporting failed status {}",
- DebugUtil.printId(queryId), params.getFragmentId(),
- DebugUtil.printId(params.getFragmentInstanceId()),
- params.getBackendId(),
- status.toString());
- } else {
- LOG.warn("one instance report fail, query_id={} fragment_id={}
instance_id={}, be={},"
- + " error message: {}",
- DebugUtil.printId(queryId), params.getFragmentId(),
- DebugUtil.printId(params.getFragmentInstanceId()),
- params.getBackendId(), status.toString());
- coordinatorContext.updateStatusIfOk(status);
- }
- }
+
+ @Override
+ protected void doProcessReportExecStatus(TReportExecStatusParams params,
SingleFragmentPipelineTask fragmentTask) {
LoadContext loadContext =
coordinatorContext.asLoadProcessor().loadContext;
if (params.isSetDeltaUrls()) {
loadContext.updateDeltaUrls(params.getDeltaUrls());
@@ -234,7 +196,7 @@ public class LoadProcessor implements JobProcessor {
if (fragmentTask.isDone()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} fragment {} is marked done",
- DebugUtil.printId(queryId), params.getFragmentId());
+ DebugUtil.printId(coordinatorContext.queryId),
params.getFragmentId());
}
latch.get().markedCountDown(params.getFragmentId(),
params.getBackendId());
}
@@ -258,22 +220,6 @@ public class LoadProcessor implements JobProcessor {
}
}
- private Map<BackendFragmentId, SingleFragmentPipelineTask>
buildBackendFragmentTasks(
- PipelineExecutionTask executionTask) {
- ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask>
backendFragmentTasks
- = ImmutableMap.builder();
- for (Entry<Long, MultiFragmentsPipelineTask> backendTask :
executionTask.getChildrenTasks().entrySet()) {
- Long backendId = backendTask.getKey();
- for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask :
backendTask.getValue()
- .getChildrenTasks().entrySet()) {
- Integer fragmentId = fragmentIdToTask.getKey();
- SingleFragmentPipelineTask fragmentTask =
fragmentIdToTask.getValue();
- backendFragmentTasks.put(new BackendFragmentId(backendId,
fragmentId), fragmentTask);
- }
- }
- return backendFragmentTasks.build();
- }
-
/*
* Check the state of backends in needCheckBackendExecStates.
* return true if all of them are OK. Otherwise, return false.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
index 8c1b9714c35..ae87d59d075 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
@@ -102,7 +102,7 @@ public class PipelineExecutionTask extends
AbstractRuntimeTask<Long, MultiFragme
@Override
public String toString() {
- return "SqlPipelineTask(\n"
+ return "PipelineExecutionTask(\n"
+ childrenTasks.allTasks()
.stream()
.map(multiFragmentsPipelineTask -> " " +
multiFragmentsPipelineTask)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
index fd00bf0e3e8..0da6f4a5fe2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
@@ -61,7 +61,7 @@ public class PipelineExecutionTaskBuilder {
backendServiceProxy,
buildMultiFragmentTasks(coordinatorContext,
backendServiceProxy, workerToFragmentsParam)
);
-
coordinatorContext.getJobProcessor().setSqlPipelineTask(pipelineExecutionTask);
+
coordinatorContext.getJobProcessor().setPipelineExecutionTask(pipelineExecutionTask);
return pipelineExecutionTask;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
index 2ec38e8cc8e..a5a5100faec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java
@@ -25,13 +25,14 @@ import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWor
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ResultSink;
+import org.apache.doris.qe.AbstractJobProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.CoordinatorContext;
-import org.apache.doris.qe.JobProcessor;
import org.apache.doris.qe.ResultReceiver;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
@@ -44,24 +45,21 @@ import org.apache.thrift.TException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
-public class QueryProcessor implements JobProcessor {
+public class QueryProcessor extends AbstractJobProcessor {
private static final Logger LOG =
LogManager.getLogger(QueryProcessor.class);
// constant fields
private final long limitRows;
// mutable field
- private Optional<PipelineExecutionTask> sqlPipelineTask;
- private final CoordinatorContext coordinatorContext;
private final List<ResultReceiver> runningReceivers;
private int receiverOffset;
private long numReceivedRows;
public QueryProcessor(CoordinatorContext coordinatorContext,
List<ResultReceiver> runningReceivers) {
- this.coordinatorContext = Objects.requireNonNull(coordinatorContext,
"coordinatorContext can not be null");
+ super(coordinatorContext);
this.runningReceivers = new CopyOnWriteArrayList<>(
Objects.requireNonNull(runningReceivers, "runningReceivers can
not be null")
);
@@ -69,8 +67,6 @@ public class QueryProcessor implements JobProcessor {
this.limitRows =
coordinatorContext.fragments.get(coordinatorContext.fragments.size() - 1)
.getPlanRoot()
.getLimit();
-
- this.sqlPipelineTask = Optional.empty();
}
public static QueryProcessor build(CoordinatorContext coordinatorContext) {
@@ -109,8 +105,8 @@ public class QueryProcessor implements JobProcessor {
}
@Override
- public void setSqlPipelineTask(PipelineExecutionTask
pipelineExecutionTask) {
- this.sqlPipelineTask = Optional.ofNullable(pipelineExecutionTask);
+ protected void doProcessReportExecStatus(TReportExecStatusParams params,
SingleFragmentPipelineTask fragmentTask) {
+
}
public boolean isEos() {
@@ -178,7 +174,7 @@ public class QueryProcessor implements JobProcessor {
receiver.cancel(cancelReason);
}
- this.sqlPipelineTask.ifPresent(sqlPipelineTask -> {
+ this.executionTask.ifPresent(sqlPipelineTask -> {
for (MultiFragmentsPipelineTask fragmentsTask :
sqlPipelineTask.getChildrenTasks().values()) {
fragmentsTask.cancelExecute(cancelReason);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]