This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 02e69d14c3b [refactor](coordinator) remove non pipeline code from
coordinator (#35888)
02e69d14c3b is described below
commit 02e69d14c3b1e86ae3556033ef98c19214a3799e
Author: yiguolei <[email protected]>
AuthorDate: Wed Jun 5 16:02:36 2024 +0800
[refactor](coordinator) remove non pipeline code from coordinator (#35888)
Co-authored-by: yiguolei <[email protected]>
---
.../main/java/org/apache/doris/qe/Coordinator.java | 903 +++------------------
1 file changed, 101 insertions(+), 802 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 94e7d59625a..1f14b6f3b8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -33,7 +33,6 @@ import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
-import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.FileQueryScanNode;
@@ -88,7 +87,6 @@ import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExecPlanFragmentParams;
-import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
@@ -213,15 +211,9 @@ public class Coordinator implements CoordInterface {
private final List<PlanFragment> fragments;
- private Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
private Map<Long, PipelineExecContexts> beToPipelineExecCtxs =
Maps.newHashMap();
- // backend execute state
- private final List<BackendExecState> backendExecStates =
Lists.newArrayList();
private final Map<Pair<Integer, Long>, PipelineExecContext>
pipelineExecContexts = new HashMap<>();
- // backend which state need to be checked when joining this coordinator.
- // It is supposed to be the subset of backendExecStates.
- private final List<BackendExecState> needCheckBackendExecStates =
Lists.newArrayList();
private final List<PipelineExecContext> needCheckPipelineExecContexts =
Lists.newArrayList();
private ResultReceiver receiver;
protected final List<ScanNode> scanNodes;
@@ -501,14 +493,12 @@ public class Coordinator implements CoordInterface {
public void clearExportStatus() {
lock.lock();
try {
- this.backendExecStates.clear();
this.pipelineExecContexts.clear();
this.queryStatus.updateStatus(TStatusCode.OK, "");
if (this.exportFiles == null) {
this.exportFiles = Lists.newArrayList();
}
this.exportFiles.clear();
- this.needCheckBackendExecStates.clear();
this.needCheckPipelineExecContexts.clear();
} finally {
lock.unlock();
@@ -525,16 +515,9 @@ public class Coordinator implements CoordInterface {
public Map<String, Integer> getBeToInstancesNum() {
Map<String, Integer> result = Maps.newTreeMap();
- if (enablePipelineEngine) {
- for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
- result.put(ctxs.brpcAddr.hostname.concat(":").concat("" +
ctxs.brpcAddr.port),
- ctxs.getInstanceNumber());
- }
- } else {
- for (BackendExecStates states : beToExecStates.values()) {
- result.put(states.brpcAddr.hostname.concat(":").concat("" +
states.brpcAddr.port),
- states.states.size());
- }
+ for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
+ result.put(ctxs.brpcAddr.hostname.concat(":").concat("" +
ctxs.brpcAddr.port),
+ ctxs.getInstanceNumber());
}
return result;
}
@@ -736,163 +719,7 @@ public class Coordinator implements CoordInterface {
}
updateProfileIfPresent(profile -> profile.setAssignFragmentTime());
- if (enablePipelineEngine) {
- sendPipelineCtx();
- } else {
- sendFragment();
- }
- }
-
- /**
- * The logic for sending query plan fragments is as follows:
- * First, plan fragments are dependent. According to the order in
"fragments" list,
- * it must be ensured that on the BE side, the next fragment instance can
be executed
- * only after the previous fragment instance is ready,
- * <p>
- * In the previous logic, we will send fragment instances in sequence
through RPC,
- * and will wait for the RPC of the previous fragment instance to return
successfully
- * before sending the next one. But for some complex queries, this may
lead to too many RPCs.
- * <p>
- * The optimized logic is as follows:
- * 1. If the number of fragment instance is <= 2, the original logic is
still used
- * to complete the sending of fragments through at most 2 RPCs.
- * 2. If the number of fragment instance is >= 3, first group all
fragments by BE,
- * and send all fragment instances to the corresponding BE node through
the FIRST rpc,
- * but these fragment instances will only perform the preparation phase
but will not be actually executed.
- * After that, the execution logic of all fragment instances is started
through the SECOND RPC.
- * <p>
- * After optimization, a query on a BE node will only send two RPCs at
most.
- * Thereby reducing the "send fragment timeout" error caused by too many
RPCs and BE unable to process in time.
- *
- * @throws TException
- * @throws RpcException
- * @throws UserException
- */
- private void sendFragment() throws TException, RpcException, UserException
{
- lock();
- try {
- Multiset<TNetworkAddress> hostCounter = HashMultiset.create();
- for (FragmentExecParams params : fragmentExecParamsMap.values()) {
- for (FInstanceExecParam fi : params.instanceExecParams) {
- hostCounter.add(fi.host);
- }
- }
-
- int backendIdx = 0;
- int profileFragmentId = 0;
- long memoryLimit = queryOptions.getMemLimit();
- Map<Long, Integer> numSinkOnBackend = Maps.newHashMap();
- beToExecStates.clear();
- // If #fragments >=2, use twoPhaseExecution with
exec_plan_fragments_prepare and exec_plan_fragments_start,
- // else use exec_plan_fragments directly.
- // we choose #fragments >=2 because in some cases
- // we need ensure that A fragment is already prepared to receive
data before B fragment sends data.
- // For example: select * from numbers("number"="10") will generate
ExchangeNode and
- // TableValuedFunctionScanNode, we should ensure
TableValuedFunctionScanNode does
- // not send data until ExchangeNode is ready to receive.
- boolean twoPhaseExecution = fragments.size() >= 2;
- for (PlanFragment fragment : fragments) {
- FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getFragmentId());
-
- // 1. set up exec states
- int instanceNum = params.instanceExecParams.size();
- Preconditions.checkState(instanceNum > 0);
- List<TExecPlanFragmentParams> tParams =
params.toThrift(backendIdx);
-
- // 2. update memory limit for colocate join
- if
(colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
- int rate =
Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum);
- long newMemory = memoryLimit / rate;
- // TODO(zxy): The meaning of mem limit in query_options
has become the real once query mem limit.
- // The logic to modify mem_limit here needs to be modified
or deleted.
- for (TExecPlanFragmentParams tParam : tParams) {
- tParam.query_options.setMemLimit(newMemory);
- }
- }
-
- boolean needCheckBackendState = false;
- if (queryOptions.getQueryType() == TQueryType.LOAD &&
profileFragmentId == 0) {
- // this is a load process, and it is the first fragment.
- // we should add all BackendExecState of this fragment to
needCheckBackendExecStates,
- // so that we can check these backends' state when joining
this Coordinator
- needCheckBackendState = true;
- }
-
- // 3. group BackendExecState by BE. So that we can use one RPC
to send all fragment instances of a BE.
- int instanceId = 0;
- for (TExecPlanFragmentParams tParam : tParams) {
- BackendExecState execState =
- new BackendExecState(fragment.getFragmentId(),
instanceId++,
- tParam, this.addressToBackendID,
executionProfile);
- // Each tParam will set the total number of Fragments that
need to be executed on the same BE,
- // and the BE will determine whether all Fragments have
been executed based on this information.
- // Notice. load fragment has a small probability that
FragmentNumOnHost is 0, for unknown reasons.
-
tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
- tParam.setBackendId(execState.backend.getId());
- tParam.setNeedWaitExecutionTrigger(twoPhaseExecution);
-
- backendExecStates.add(execState);
- if (needCheckBackendState) {
- needCheckBackendExecStates.add(execState);
- if (LOG.isDebugEnabled()) {
- LOG.debug("add need check backend {} for fragment,
{} job: {}",
- execState.backend.getId(),
fragment.getFragmentId().asInt(), jobId);
- }
- }
-
- BackendExecStates states =
beToExecStates.get(execState.backend.getId());
- if (states == null) {
- states = new
BackendExecStates(execState.backend.getId(), execState.brpcAddress,
- twoPhaseExecution,
execState.backend.getProcessEpoch());
- beToExecStates.putIfAbsent(execState.backend.getId(),
states);
- }
- states.addState(execState);
- if (tParam.getFragment().getOutputSink() != null
- && tParam.getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
- numSinkOnBackend.merge(execState.backend.getId(), 1,
Integer::sum);
- }
- ++backendIdx;
- }
- int loadStreamPerNode = 1;
- if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable() != null) {
- loadStreamPerNode =
ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
- }
- for (TExecPlanFragmentParams tParam : tParams) {
- if (tParam.getFragment().getOutputSink() != null
- && tParam.getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
- tParam.setLoadStreamPerNode(loadStreamPerNode);
- tParam.setTotalLoadStreams(numSinkOnBackend.size() *
loadStreamPerNode);
-
tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId()));
- LOG.info("num local sink for backend {} is {}",
tParam.getBackendId(),
- numSinkOnBackend.get(tParam.getBackendId()));
- }
- }
- profileFragmentId += 1;
- } // end for fragments
-
- // 4. send and wait fragments rpc
- List<Triple<BackendExecStates, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>>>
- futures = Lists.newArrayList();
-
- for (BackendExecStates states : beToExecStates.values()) {
- states.unsetFields();
- BackendServiceProxy proxy = BackendServiceProxy.getInstance();
- futures.add(ImmutableTriple.of(states, proxy,
states.execRemoteFragmentsAsync(proxy)));
- }
- waitRpc(futures, this.timeoutDeadline -
System.currentTimeMillis(), "send fragments");
-
- if (twoPhaseExecution) {
- // 5. send and wait execution start rpc
- futures.clear();
- for (BackendExecStates states : beToExecStates.values()) {
- BackendServiceProxy proxy =
BackendServiceProxy.getInstance();
- futures.add(ImmutableTriple.of(states, proxy,
states.execPlanFragmentStartAsync(proxy)));
- }
- waitRpc(futures, this.timeoutDeadline -
System.currentTimeMillis(), "send execution start");
- }
- } finally {
- unlock();
- }
+ sendPipelineCtx();
}
private void sendPipelineCtx() throws TException, RpcException,
UserException {
@@ -1059,83 +886,6 @@ public class Coordinator implements CoordInterface {
}
}
- private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy,
Future<PExecPlanFragmentResult>>> futures,
- long leftTimeMs,
- String operation) throws RpcException, UserException {
- if (leftTimeMs <= 0) {
- long currentTimeMillis = System.currentTimeMillis();
- long elapsed = (currentTimeMillis - timeoutDeadline) / 1000 +
queryOptions.getExecutionTimeout();
- String msg = String.format(
- "timeout before waiting %s rpc, query timeout:%d, already
elapsed:%d, left for this:%d",
- operation, queryOptions.getExecutionTimeout(), elapsed,
leftTimeMs);
- LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
- if (!queryOptions.isSetExecutionTimeout() ||
!queryOptions.isSetQueryTimeout()) {
- LOG.warn("Query {} does not set timeout info, execution
timeout: is_set:{}, value:{}"
- + ", query timeout: is_set:{}, value: {}, "
- + "coordinator timeout deadline {}, cur time
millis: {}",
- DebugUtil.printId(queryId),
- queryOptions.isSetExecutionTimeout(),
queryOptions.getExecutionTimeout(),
- queryOptions.isSetQueryTimeout(),
queryOptions.getQueryTimeout(),
- timeoutDeadline, currentTimeMillis);
- }
- throw new UserException(msg);
- }
-
- long timeoutMs = Math.min(leftTimeMs,
Config.remote_fragment_exec_timeout_ms);
- for (Triple<BackendExecStates, BackendServiceProxy,
Future<PExecPlanFragmentResult>> triple : futures) {
- TStatusCode code;
- String errMsg = null;
- Exception exception = null;
-
- try {
- PExecPlanFragmentResult result =
triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
- code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- if (!result.getStatus().getErrorMsgsList().isEmpty()) {
- errMsg = result.getStatus().getErrorMsgsList().get(0);
- } else {
- errMsg = operation + " failed. backend id: " +
triple.getLeft().beId;
- }
- }
- } catch (ExecutionException e) {
- exception = e;
- code = TStatusCode.THRIFT_RPC_ERROR;
- triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
- } catch (InterruptedException e) {
- exception = e;
- code = TStatusCode.INTERNAL_ERROR;
- } catch (TimeoutException e) {
- exception = e;
- errMsg = String.format(
- "timeout when waiting for %s rpc, query timeout:%d, left
timeout for this operation:%d",
- operation, queryOptions.getExecutionTimeout(), timeoutMs /
1000);
- LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg);
- code = TStatusCode.TIMEOUT;
- }
-
- if (code != TStatusCode.OK) {
- if (exception != null && errMsg == null) {
- errMsg = operation + " failed. " + exception.getMessage();
- }
- queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
- cancelInternal(queryStatus);
- switch (code) {
- case TIMEOUT:
-
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
- .increase(1L);
- throw new
RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
- case THRIFT_RPC_ERROR:
-
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
- .increase(1L);
- SimpleScheduler.addToBlacklist(triple.getLeft().beId,
errMsg);
- throw new
RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
- default:
- throw new UserException(errMsg, exception);
- }
- }
- }
- }
-
private void waitPipelineRpc(List<Triple<PipelineExecContexts,
BackendServiceProxy,
Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
String operation) throws RpcException, UserException {
@@ -1413,56 +1163,30 @@ public class Coordinator implements CoordInterface {
try {
lock();
-
- if (queryOptions.isEnablePipelineEngine()) {
- for (PipelineExecContext pipelineExecContext :
pipelineExecContexts.values()) {
- Backend be =
curBeMap.get(pipelineExecContext.backend.getId());
- if (be == null || !be.isAlive()) {
- Status errorStatus = new Status(TStatusCode.CANCELLED,
- "Backend {} not exists or dead, query {}
should be cancelled",
- pipelineExecContext.backend.toString(),
DebugUtil.printId(queryId));
- LOG.warn(errorStatus.getErrorMsg());
- return errorStatus;
- }
-
- // Backend process epoch changed, indicates that this be
restarts, query should be cancelled.
- // Check zero since during upgrading, older version oplog
will not persistent be start time
- // so newer version follower will get zero epoch when
replaying oplog or snapshot
- if (pipelineExecContext.beProcessEpoch !=
be.getProcessEpoch() && be.getProcessEpoch() != 0) {
- Status errorStatus = new Status(TStatusCode.CANCELLED,
- "Backend process epoch changed, previous {}
now {}, "
- + "means this be has already restarted, should
cancel this coordinator,"
- + "query id {}",
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
- DebugUtil.printId(queryId));
- LOG.warn(errorStatus.getErrorMsg());
- return errorStatus;
- } else if (be.getProcessEpoch() == 0) {
- LOG.warn("Backend {} has zero process epoch, maybe we
are upgrading cluster?",
- be.toString());
- }
- }
- } else {
- // beToExecStates will be updated only in non-pipeline query.
- for (BackendExecStates beExecState : beToExecStates.values()) {
- Backend be = curBeMap.get(beExecState.beId);
- if (be == null || !be.isAlive()) {
- Status errorStatus = new Status(TStatusCode.CANCELLED,
- "Backend {} not exists or dead, query {}
should be cancelled.",
- beExecState.beId, DebugUtil.printId(queryId));
- LOG.warn(errorStatus.getErrorMsg());
- return errorStatus;
- }
-
- if (beExecState.beProcessEpoch != be.getProcessEpoch() &&
be.getProcessEpoch() != 0) {
- Status errorStatus = new Status(TStatusCode.CANCELLED,
- "Process epoch changed, previous {} now {},
means this be has already restarted,"
- + "should cancel this coordinator, query id
{}",
- beExecState.beProcessEpoch,
be.getProcessEpoch(), DebugUtil.printId(queryId));
- LOG.warn(errorStatus.getErrorMsg());
- return errorStatus;
- } else if (be.getProcessEpoch() == 0) {
- LOG.warn("Backend {} has zero process epoch, maybe we
are upgrading cluster?", be.toString());
- }
+ for (PipelineExecContext pipelineExecContext :
pipelineExecContexts.values()) {
+ Backend be = curBeMap.get(pipelineExecContext.backend.getId());
+ if (be == null || !be.isAlive()) {
+ Status errorStatus = new Status(TStatusCode.CANCELLED,
+ "Backend {} not exists or dead, query {} should be
cancelled",
+ pipelineExecContext.backend.toString(),
DebugUtil.printId(queryId));
+ LOG.warn(errorStatus.getErrorMsg());
+ return errorStatus;
+ }
+
+ // Backend process epoch changed, indicates that this be
restarts, query should be cancelled.
+ // Check zero since during upgrading, older version oplog will
not persistent be start time
+ // so newer version follower will get zero epoch when
replaying oplog or snapshot
+ if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch()
&& be.getProcessEpoch() != 0) {
+ Status errorStatus = new Status(TStatusCode.CANCELLED,
+ "Backend process epoch changed, previous {} now
{}, "
+ + "means this be has already restarted, should
cancel this coordinator,"
+ + "query id {}",
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
+ DebugUtil.printId(queryId));
+ LOG.warn(errorStatus.getErrorMsg());
+ return errorStatus;
+ } else if (be.getProcessEpoch() == 0) {
+ LOG.warn("Backend {} has zero process epoch, maybe we are
upgrading cluster?",
+ be.toString());
}
}
@@ -1538,14 +1262,8 @@ public class Coordinator implements CoordInterface {
}
private void cancelRemoteFragmentsAsync(Status cancelReason) {
- if (enablePipelineEngine) {
- for (PipelineExecContexts ctx : beToPipelineExecCtxs.values()) {
- ctx.cancelQuery(cancelReason);
- }
- } else {
- for (BackendExecStates backendExecState : beToExecStates.values())
{
- backendExecState.cancelQuery(cancelReason);
- }
+ for (PipelineExecContexts ctx : beToPipelineExecCtxs.values()) {
+ ctx.cancelQuery(cancelReason);
}
}
@@ -2546,201 +2264,63 @@ public class Coordinator implements CoordInterface {
// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
- if (enablePipelineXEngine) {
- PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
- if (ctx == null || !ctx.updatePipelineStatus(params)) {
- return;
- }
-
- Status status = new Status(params.status);
- // for now, abort the query if we see any error except if the
error is cancelled
- // and returned_all_results_ is true.
- // (UpdateStatus() initiates cancellation, if it hasn't already
been initiated)
- if (!status.ok()) {
- if (returnedAllResults && status.isCancelled()) {
- LOG.warn("Query {} has returned all results,
fragment_id={} instance_id={}, be={}"
- + " is reporting failed status {}",
- DebugUtil.printId(queryId), params.getFragmentId(),
- DebugUtil.printId(params.getFragmentInstanceId()),
- params.getBackendId(),
- status.toString());
- } else {
- LOG.warn("one instance report fail, query_id={}
fragment_id={} instance_id={}, be={},"
- + " error message: {}",
- DebugUtil.printId(queryId), params.getFragmentId(),
- DebugUtil.printId(params.getFragmentInstanceId()),
- params.getBackendId(), status.toString());
- updateStatus(status);
- }
- }
- if (params.isSetDeltaUrls()) {
- updateDeltas(params.getDeltaUrls());
- }
- if (params.isSetLoadCounters()) {
- updateLoadCounters(params.getLoadCounters());
- }
- if (params.isSetTrackingUrl()) {
- trackingUrl = params.getTrackingUrl();
- }
- if (params.isSetExportFiles()) {
- updateExportFiles(params.getExportFiles());
- }
- if (params.isSetCommitInfos()) {
- updateCommitInfos(params.getCommitInfos());
- }
- if (params.isSetErrorTabletInfos()) {
- updateErrorTabletInfos(params.getErrorTabletInfos());
- }
- if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc
!= null) {
-
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
- }
- if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc !=
null) {
- icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
- }
-
- if (ctx.done) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Query {} fragment {} is marked done",
- DebugUtil.printId(queryId), ctx.fragmentId);
- }
- fragmentsDoneLatch.markedCountDown(params.getFragmentId(),
params.getBackendId());
- }
- } else if (enablePipelineEngine) {
- PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
- if (ctx == null || !ctx.updatePipelineStatus(params)) {
- return;
- }
-
- Status status = new Status(params.status);
- // for now, abort the query if we see any error except if the
error is cancelled
- // and returned_all_results_ is true.
- // (UpdateStatus() initiates cancellation, if it hasn't already
been initiated)
- if (!status.ok()) {
- if (returnedAllResults && status.isCancelled()) {
- LOG.warn("Query {} has returned all results,
fragment_id={} instance_id={}, be={}"
- + " is reporting failed status {}",
- DebugUtil.printId(queryId), params.getFragmentId(),
- DebugUtil.printId(params.getFragmentInstanceId()),
- params.getBackendId(),
- status.toString());
- } else {
- LOG.warn("one instance report fail, query_id={}
fragment_id={} instance_id={}, be={},"
- + " error message: {}",
- DebugUtil.printId(queryId), params.getFragmentId(),
- DebugUtil.printId(params.getFragmentInstanceId()),
- params.getBackendId(), status.toString());
- updateStatus(status);
- }
- }
+ PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
+ if (ctx == null || !ctx.updatePipelineStatus(params)) {
+ return;
+ }
- // params.isDone() should be promised.
- // There are some periodic reports during the load process,
- // and the reports from the intermediate process may be concurrent
with the last report.
- // The last report causes the counter to decrease to zero,
- // but it is possible that the report without commit-info
triggered the commit operation,
- // resulting in the data not being published.
- if (ctx.fragmentInstancesMap.get(params.fragment_instance_id) &&
params.isDone()) {
- if (params.isSetDeltaUrls()) {
- updateDeltas(params.getDeltaUrls());
- }
- if (params.isSetLoadCounters()) {
- updateLoadCounters(params.getLoadCounters());
- }
- if (params.isSetTrackingUrl()) {
- trackingUrl = params.getTrackingUrl();
- }
- if (params.isSetExportFiles()) {
- updateExportFiles(params.getExportFiles());
- }
- if (params.isSetCommitInfos()) {
- updateCommitInfos(params.getCommitInfos());
- }
- if (params.isSetErrorTabletInfos()) {
- updateErrorTabletInfos(params.getErrorTabletInfos());
- }
- if (params.isSetHivePartitionUpdates() &&
hivePartitionUpdateFunc != null) {
-
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
- }
- if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc
!= null) {
-
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Query {} instance {} is marked done",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
- }
-
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
+ Status status = new Status(params.status);
+ // for now, abort the query if we see any error except if the error is
cancelled
+ // and returned_all_results_ is true.
+ // (UpdateStatus() initiates cancellation, if it hasn't already been
initiated)
+ if (!status.ok()) {
+ if (returnedAllResults && status.isCancelled()) {
+ LOG.warn("Query {} has returned all results, fragment_id={}
instance_id={}, be={}"
+ + " is reporting failed status {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(),
+ status.toString());
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Query {} instance {} is not marked done",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
- }
- }
- } else {
- if (params.backend_num >= backendExecStates.size()) {
- LOG.warn("Query {} instance {} unknown backend number: {},
expected less than: {}",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
- params.backend_num, backendExecStates.size());
- return;
- }
- BackendExecState execState =
backendExecStates.get(params.backend_num);
- if (!execState.updateInstanceStatus(params)) {
- // Has to return here, to avoid out of order report messages.
For example,
- // the first message is done, then we update commit messages,
but the new
- // message is running, then we will also update commit
messages. It will
- // lead to data corrupt.
- return;
- }
-
- Status status = new Status(params.status);
- // for now, abort the query if we see any error except if the
error is cancelled
- // and returned_all_results_ is true.
- // (UpdateStatus() initiates cancellation, if it hasn't already
been initiated)
- if (!status.ok()) {
- if (status.isCancelled() && returnedAllResults) {
- LOG.warn("Query {} has returned all results, its instance
{} is reporting failed status {}",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
- status.toString());
- } else {
- LOG.warn("Instance {} of query {} report failed status,
error msg: {}",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
- status.toString());
- updateStatus(status);
- }
+ LOG.warn("one instance report fail, query_id={} fragment_id={}
instance_id={}, be={},"
+ + " error message: {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(), status.toString());
+ updateStatus(status);
}
+ }
+ if (params.isSetDeltaUrls()) {
+ updateDeltas(params.getDeltaUrls());
+ }
+ if (params.isSetLoadCounters()) {
+ updateLoadCounters(params.getLoadCounters());
+ }
+ if (params.isSetTrackingUrl()) {
+ trackingUrl = params.getTrackingUrl();
+ }
+ if (params.isSetExportFiles()) {
+ updateExportFiles(params.getExportFiles());
+ }
+ if (params.isSetCommitInfos()) {
+ updateCommitInfos(params.getCommitInfos());
+ }
+ if (params.isSetErrorTabletInfos()) {
+ updateErrorTabletInfos(params.getErrorTabletInfos());
+ }
+ if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc !=
null) {
+ hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
+ }
+ if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null)
{
+ icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
+ }
- // params.isDone() should be promised.
- // There are some periodic reports during the load process,
- // and the reports from the intermediate process may be concurrent
with the last report.
- // The last report causes the counter to decrease to zero,
- // but it is possible that the report without commit-info
triggered the commit operation,
- // resulting in the data not being published.
- if (execState.done && params.isDone()) {
- if (params.isSetDeltaUrls()) {
- updateDeltas(params.getDeltaUrls());
- }
- if (params.isSetLoadCounters()) {
- updateLoadCounters(params.getLoadCounters());
- }
- if (params.isSetTrackingUrl()) {
- trackingUrl = params.getTrackingUrl();
- }
- if (params.isSetExportFiles()) {
- updateExportFiles(params.getExportFiles());
- }
- if (params.isSetCommitInfos()) {
- updateCommitInfos(params.getCommitInfos());
- }
- if (params.isSetErrorTabletInfos()) {
- updateErrorTabletInfos(params.getErrorTabletInfos());
- }
- if (params.isSetHivePartitionUpdates() &&
hivePartitionUpdateFunc != null) {
-
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
- }
- if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc
!= null) {
-
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
- }
-
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
+ if (ctx.done) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Query {} fragment {} is marked done",
+ DebugUtil.printId(queryId), ctx.fragmentId);
}
+ fragmentsDoneLatch.markedCountDown(params.getFragmentId(),
params.getBackendId());
}
if (params.isSetLoadedRows() && jobId != -1) {
@@ -2800,21 +2380,11 @@ public class Coordinator implements CoordInterface {
* return true if all of them are OK. Otherwise, return false.
*/
private boolean checkBackendState() {
- if (enablePipelineEngine) {
- for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
- if (!ctx.isBackendStateHealthy()) {
- queryStatus = new Status(TStatusCode.INTERNAL_ERROR,
"backend "
- + ctx.backend.getId() + " is down");
- return false;
- }
- }
- } else {
- for (BackendExecState backendExecState :
needCheckBackendExecStates) {
- if (!backendExecState.isBackendStateHealthy()) {
- queryStatus = new Status(TStatusCode.INTERNAL_ERROR,
"backend "
- + backendExecState.backend.getId() + " is down");
- return false;
- }
+ for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
+ if (!ctx.isBackendStateHealthy()) {
+ queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend "
+ + ctx.backend.getId() + " is down");
+ return false;
}
}
return true;
@@ -3128,85 +2698,6 @@ public class Coordinator implements CoordInterface {
private final BucketShuffleJoinController bucketShuffleJoinController
= new BucketShuffleJoinController(fragmentIdToScanNodeIds);
- // record backend execute state
- // TODO(zhaochun): add profile information and others
- public class BackendExecState {
- TExecPlanFragmentParams rpcParams;
- PlanFragmentId fragmentId;
- boolean initiated;
- volatile boolean done;
- TNetworkAddress brpcAddress;
- TNetworkAddress address;
- Backend backend;
- long lastMissingHeartbeatTime = -1;
- TUniqueId instanceId;
-
- public BackendExecState(PlanFragmentId fragmentId, int instanceId,
- TExecPlanFragmentParams rpcParams,
Map<TNetworkAddress, Long> addressToBackendID,
- ExecutionProfile executionProfile) {
- this.fragmentId = fragmentId;
- this.rpcParams = rpcParams;
- this.initiated = false;
- this.done = false;
- FInstanceExecParam fi =
fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId);
- this.instanceId = fi.instanceId;
- this.address = fi.host;
- this.backend = idToBackend.get(addressToBackendID.get(address));
- this.brpcAddress = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
- this.lastMissingHeartbeatTime =
backend.getLastMissingHeartbeatTime();
- String profileName = "Instance " + DebugUtil.printId(
- fi.instanceId) + " (host=" +
this.backend.getHeartbeatAddress() + ")";
- RuntimeProfile instanceProfile = new RuntimeProfile(profileName);
- executionProfile.addInstanceProfile(fragmentId, fi.instanceId,
instanceProfile);
- }
-
- /**
- * Some information common to all Fragments does not need to be sent
repeatedly.
- * Therefore, when we confirm that a certain BE has accepted the
information,
- * we will delete the information in the subsequent Fragment to avoid
repeated sending.
- * This information can be obtained from the cache of BE.
- */
- public void unsetFields() {
- this.rpcParams.unsetDescTbl();
- this.rpcParams.unsetFileScanParams();
- this.rpcParams.unsetCoord();
- this.rpcParams.unsetQueryGlobals();
- this.rpcParams.unsetResourceInfo();
- this.rpcParams.setIsSimplifiedParam(true);
- }
-
- // update the instance status, if it is already finished, then not
update any more.
- public synchronized boolean
updateInstanceStatus(TReportExecStatusParams params) {
- if (this.done) {
- // duplicate packet
- return false;
- }
- this.done = params.done;
- if (statsErrorEstimator != null) {
- statsErrorEstimator.updateExactReturnedRows(params);
- }
- return true;
- }
-
- public boolean isBackendStateHealthy() {
- if (backend.getLastMissingHeartbeatTime() >
lastMissingHeartbeatTime && !backend.isAlive()) {
- LOG.warn("backend {} is down while joining the coordinator.
job id: {}",
- backend.getId(), jobId);
- return false;
- }
- return true;
- }
-
- public FragmentInstanceInfo buildFragmentInstanceInfo() {
- return new
QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId())
-
.fragmentId(String.valueOf(fragmentId)).address(this.address).build();
- }
-
- private TUniqueId fragmentInstanceId() {
- return this.rpcParams.params.getFragmentInstanceId();
- }
- }
-
public class PipelineExecContext {
TPipelineFragmentParams rpcParams;
PlanFragmentId fragmentId;
@@ -3272,31 +2763,12 @@ public class Coordinator implements CoordInterface {
if (!params.done) {
return false;
}
- if (enablePipelineX) {
- if (this.done) {
- // duplicate packet
- return false;
- }
- this.done = true;
- return true;
- } else {
- // could not find the related instances, not update and return
false, to indicate
- // that the caller should not update any more.
- if
(!fragmentInstancesMap.containsKey(params.fragment_instance_id)) {
- return false;
- }
- Boolean instanceDone =
fragmentInstancesMap.get(params.fragment_instance_id);
- if (instanceDone) {
- // duplicate packet
- return false;
- }
- fragmentInstancesMap.put(params.fragment_instance_id, true);
- profileReportProgress++;
- if (profileReportProgress == numInstances) {
- this.done = true;
- }
- return true;
+ if (this.done) {
+ // duplicate packet
+ return false;
}
+ this.done = true;
+ return true;
}
public boolean isBackendStateHealthy() {
@@ -3315,159 +2787,6 @@ public class Coordinator implements CoordInterface {
}
}
- /**
- * A set of BackendExecState for same Backend
- */
- public class BackendExecStates {
- long beId;
- TNetworkAddress brpcAddr;
- List<BackendExecState> states = Lists.newArrayList();
- boolean twoPhaseExecution = false;
- long beProcessEpoch = 0;
- boolean hasCancelled = false;
- boolean cancelInProcess = false;
-
- public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean
twoPhaseExecution, long beProcessEpoch) {
- this.beId = beId;
- this.brpcAddr = brpcAddr;
- this.twoPhaseExecution = twoPhaseExecution;
- this.beProcessEpoch = beProcessEpoch;
- }
-
- public void addState(BackendExecState state) {
- this.states.add(state);
- }
-
- /**
- * The BackendExecState in states are all send to the same BE.
- * So only the first BackendExecState need to carry some common
fields, such as DescriptorTbl,
- * the other BackendExecState does not need those fields. Unset them
to reduce size.
- */
- public void unsetFields() {
- boolean first = true;
- for (BackendExecState state : states) {
- if (first) {
- first = false;
- continue;
- }
- state.unsetFields();
- }
- }
-
- public Future<InternalService.PExecPlanFragmentResult>
execRemoteFragmentsAsync(BackendServiceProxy proxy)
- throws TException {
- try {
- TExecPlanFragmentParamsList paramsList = new
TExecPlanFragmentParamsList();
- for (BackendExecState state : states) {
- state.initiated = true;
- paramsList.addToParamsList(state.rpcParams);
- }
- return proxy.execPlanFragmentsAsync(brpcAddr, paramsList,
twoPhaseExecution);
- } catch (RpcException e) {
- // DO NOT throw exception here, return a complete future with
error code,
- // so that the following logic will cancel the fragment.
- return futureWithException(e);
- }
- }
-
- public Future<InternalService.PExecPlanFragmentResult>
execPlanFragmentStartAsync(BackendServiceProxy proxy)
- throws TException {
- try {
- PExecPlanFragmentStartRequest.Builder builder =
PExecPlanFragmentStartRequest.newBuilder();
- PUniqueId qid =
PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
- builder.setQueryId(qid);
- return proxy.execPlanFragmentStartAsync(brpcAddr,
builder.build());
- } catch (RpcException e) {
- // DO NOT throw exception here, return a complete future with
error code,
- // so that the following logic will cancel the fragment.
- return futureWithException(e);
- }
- }
-
- @NotNull
- private Future<PExecPlanFragmentResult>
futureWithException(RpcException e) {
- return new Future<PExecPlanFragmentResult>() {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
-
- @Override
- public PExecPlanFragmentResult get() {
- PExecPlanFragmentResult result =
PExecPlanFragmentResult.newBuilder().setStatus(
-
Types.PStatus.newBuilder().addErrorMsgs(e.getMessage())
-
.setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build();
- return result;
- }
-
- @Override
- public PExecPlanFragmentResult get(long timeout, TimeUnit
unit) {
- return get();
- }
- };
- }
-
- // cancel the fragment instance.
- // return true if cancel success. Otherwise, return false
- public synchronized void cancelQuery(Status cancelReason) {
- LOG.warn("cancelRemoteFragments backend: {}, reason: {}",
- idToBackend.get(beId), cancelReason.toString());
- try {
- if (this.hasCancelled || this.cancelInProcess) {
- LOG.info("Fragment instance has already been cancelled {}
or under cancel {}."
- + " backend: {}, reason: {}",
- this.hasCancelled, this.cancelInProcess,
- idToBackend.get(beId), cancelReason.toString());
- return;
- }
- try {
-
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
-
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddr,
cancelReason);
- Futures.addCallback(cancelResult, new
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
- public void
onSuccess(InternalService.PCancelPlanFragmentResult result) {
- cancelInProcess = false;
- if (result.hasStatus()) {
- Status status = new Status(result.getStatus());
- if (status.getErrorCode() == TStatusCode.OK) {
- hasCancelled = true;
- } else {
- LOG.warn("Failed to cancel query {}
backend: {}, reason: {}",
- DebugUtil.printId(queryId),
idToBackend.get(beId), status.toString());
- }
- }
- LOG.warn("Failed to cancel query {} backend: {},
without status",
- DebugUtil.printId(queryId),
idToBackend.get(beId));
- }
-
- public void onFailure(Throwable t) {
- cancelInProcess = false;
- LOG.warn("Failed to cancel query {} backend: {},
reason: {}",
- DebugUtil.printId(queryId),
idToBackend.get(beId), cancelReason.toString(), t);
- }
- }, backendRpcCallbackExecutor);
- cancelInProcess = true;
- } catch (RpcException e) {
- LOG.warn("cancel plan fragment get a exception,
address={}:{}", brpcAddr.getHostname(),
- brpcAddr.getPort());
-
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddr),
e.getMessage());
- }
-
- } catch (Exception e) {
- LOG.warn("catch a exception", e);
- }
- }
- }
-
public class PipelineExecContexts {
long beId;
TNetworkAddress brpcAddr;
@@ -4012,26 +3331,13 @@ public class Coordinator implements CoordInterface {
Lists.newArrayList();
lock();
try {
- if (enablePipelineEngine) {
- for (int index = 0; index < fragments.size(); index++) {
- for (PipelineExecContext ctx :
pipelineExecContexts.values()) {
- if (fragments.get(index).getFragmentId() !=
ctx.fragmentId) {
- continue;
- }
- final List<QueryStatisticsItem.FragmentInstanceInfo>
info = ctx.buildFragmentInstanceInfo();
- result.addAll(info);
- }
- }
- } else {
- for (int index = 0; index < fragments.size(); index++) {
- for (BackendExecState backendExecState :
backendExecStates) {
- if (fragments.get(index).getFragmentId() !=
backendExecState.fragmentId) {
- continue;
- }
- final QueryStatisticsItem.FragmentInstanceInfo info =
- backendExecState.buildFragmentInstanceInfo();
- result.add(info);
+ for (int index = 0; index < fragments.size(); index++) {
+ for (PipelineExecContext ctx : pipelineExecContexts.values()) {
+ if (fragments.get(index).getFragmentId() !=
ctx.fragmentId) {
+ continue;
}
+ final List<QueryStatisticsItem.FragmentInstanceInfo> info
= ctx.buildFragmentInstanceInfo();
+ result.addAll(info);
}
}
} finally {
@@ -4043,16 +3349,9 @@ public class Coordinator implements CoordInterface {
@Override
public List<TNetworkAddress> getInvolvedBackends() {
List<TNetworkAddress> backendAddresses = Lists.newArrayList();
- if (this.enablePipelineXEngine) {
- for (Long backendId : this.beToPipelineExecCtxs.keySet()) {
- Backend backend = idToBackend.get(backendId);
- backendAddresses.add(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
- }
- } else {
- for (Long backendId : this.beToExecStates.keySet()) {
- Backend backend = idToBackend.get(backendId);
- backendAddresses.add(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
- }
+ for (Long backendId : this.beToPipelineExecCtxs.keySet()) {
+ Backend backend = idToBackend.get(backendId);
+ backendAddresses.add(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
}
return backendAddresses;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]