yiguolei commented on code in PR #16940:
URL: https://github.com/apache/doris/pull/16940#discussion_r1133603827
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -1177,6 +1213,105 @@ public TStreamLoadPutResult
streamLoadPut(TStreamLoadPutRequest request) {
return result;
}
+ public class ReportStreamLoadWorker implements Runnable {
+ private long backendId;
+ private TUniqueId loadId;
+ private int execTimeout;
+
+ public ReportStreamLoadWorker(long backendId, TUniqueId loadId, int
execTimeout) {
+ this.backendId = backendId;
+ this.loadId = loadId;
+ this.execTimeout = execTimeout;
+ }
+
+ @Override
+ public void run() {
+ Coordinator coord =
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
+ boolean notTimeout = coord.join(execTimeout);
+ // check stream load exec status
+ Status status = Status.OK;
+ if (!coord.isDone()) {
+ coord.cancel();
+ if (notTimeout) {
+ String errMsg = coord.getExecStatus().getErrorMsg();
+ status = new Status(ErrCode.COMMON_ERROR, "There exists
unhealthy backend. " + errMsg);
+ } else {
+ status = new Status(ErrCode.TIMEOUT, "");
+ }
+ QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
+ }
+ if (!coord.getExecStatus().ok()) {
+ String errMsg = coord.getExecStatus().getErrorMsg();
+ LOG.warn("stream load failed: {}", errMsg);
+ status = new Status(ErrCode.COMMON_ERROR, errMsg);
+ QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
+ }
+ PReportStreamLoadStatusRequest request =
InternalService.PReportStreamLoadStatusRequest.newBuilder()
+
.setLoadId(PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo).build())
+
.setStatus(Types.PStatus.newBuilder().addErrorMsgs(status.getErrMsg())
+
.setStatusCode(status.getErrCode().ordinal()).build())
+ .build();
+
+ // get backend address
+ ImmutableMap<Long, Backend> backendMap =
Env.getCurrentSystemInfo().getIdToBackend();
+ Backend be = backendMap.get(backendId);
+ TNetworkAddress address;
+ if (be == null || !be.isAlive()) {
+ LOG.warn("report stream load failed. no backend");
+ return;
+ }
+ address = new TNetworkAddress(be.getIp(), be.getBrpcPort());
+ try {
+
BackendServiceProxy.getInstance().reportStreamLoadStatus(address, request);
+ } catch (Throwable e) {
+ LOG.warn("report stream load failed.", e);
+ }
+ }
+ }
+
+ private void streamLoadPutWithSqlImpl(TStreamLoadPutRequest request)
throws UserException {
+ LOG.info("receive stream load put request");
+ String loadSql = request.getLoadSql();
+ ConnectContext ctx = new ConnectContext(null);
+ ctx.setEnv(Env.getCurrentEnv());
+ ctx.setQueryId(request.getLoadId());
+ ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+ ctx.setCurrentUserIdentity(UserIdentity.ROOT);
+ ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
+ ctx.setThreadLocalInfo();
+ ctx.setBackendId(request.getBackendId());
+ StreamLoadTask streamLoadTask =
StreamLoadTask.fromTStreamLoadPutRequest(request);
+ ctx.setStreamLoadInfo(streamLoadTask);
+ ctx.setLoadId(request.getLoadId());
+ SqlScanner input = new SqlScanner(new StringReader(loadSql),
ctx.getSessionVariable().getSqlMode());
+ SqlParser parser = new SqlParser(input);
+ try {
+ StatementBase parsedStmt = SqlParserUtils.getFirstStmt(parser);
+ parsedStmt.setOrigStmt(new OriginStatement(loadSql, 0));
+ parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
+ StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
+ ctx.setExecutor(executor);
+ TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift();
+ executor.analyze(tQueryOptions);
+ Coordinator coord = new Coordinator(ctx, executor.getAnalyzer(),
executor.planner());
+ coord.setLoadMemLimit(request.getExecMemLimit());
+ coord.setQueryType(TQueryType.LOAD);
+ QeProcessorImpl.INSTANCE.registerQuery(request.getLoadId(), coord);
+ coord.exec();
+ } catch (UserException e) {
+ LOG.warn("exec sql error {}", e.getMessage());
Review Comment:
LOG.warn("exec sql error {}", e);
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]