yiguolei commented on code in PR #16940:
URL: https://github.com/apache/doris/pull/16940#discussion_r1133604459


##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -1177,6 +1213,105 @@ public TStreamLoadPutResult 
streamLoadPut(TStreamLoadPutRequest request) {
         return result;
     }
 
+    public class ReportStreamLoadWorker implements Runnable {
+        private long backendId;
+        private TUniqueId loadId;
+        private int execTimeout;
+
+        public ReportStreamLoadWorker(long backendId, TUniqueId loadId, int 
execTimeout) {
+            this.backendId = backendId;
+            this.loadId = loadId;
+            this.execTimeout = execTimeout;
+        }
+
+        @Override
+        public void run() {
+            Coordinator coord = 
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
+            boolean notTimeout = coord.join(execTimeout);
+            // check stream load exec status
+            Status status = Status.OK;
+            if (!coord.isDone()) {
+                coord.cancel();
+                if (notTimeout) {
+                    String errMsg = coord.getExecStatus().getErrorMsg();
+                    status = new Status(ErrCode.COMMON_ERROR, "There exists 
unhealthy backend. " + errMsg);
+                } else {
+                    status = new Status(ErrCode.TIMEOUT, "");
+                }
+                QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
+            }
+            if (!coord.getExecStatus().ok()) {
+                String errMsg = coord.getExecStatus().getErrorMsg();
+                LOG.warn("stream load failed: {}", errMsg);
+                status = new Status(ErrCode.COMMON_ERROR, errMsg);
+                QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
+            }
+            PReportStreamLoadStatusRequest request = 
InternalService.PReportStreamLoadStatusRequest.newBuilder()
+                    
.setLoadId(PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo).build())
+                    
.setStatus(Types.PStatus.newBuilder().addErrorMsgs(status.getErrMsg())
+                                    
.setStatusCode(status.getErrCode().ordinal()).build())
+                    .build();
+
+            // get backend address
+            ImmutableMap<Long, Backend> backendMap = 
Env.getCurrentSystemInfo().getIdToBackend();
+            Backend be = backendMap.get(backendId);
+            TNetworkAddress address;
+            if (be == null || !be.isAlive()) {
+                LOG.warn("report stream load failed. no backend");
+                return;
+            }
+            address = new TNetworkAddress(be.getIp(), be.getBrpcPort());
+            try {
+                
BackendServiceProxy.getInstance().reportStreamLoadStatus(address, request);
+            } catch (Throwable e) {
+                LOG.warn("report stream load failed.", e);
+            }
+        }
+    }
+
+    private void streamLoadPutWithSqlImpl(TStreamLoadPutRequest request) 
throws UserException {
+        LOG.info("receive stream load put request");
+        String loadSql = request.getLoadSql();
+        ConnectContext ctx = new ConnectContext(null);
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setQueryId(request.getLoadId());
+        ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+        ctx.setCurrentUserIdentity(UserIdentity.ROOT);
+        ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
+        ctx.setThreadLocalInfo();
+        ctx.setBackendId(request.getBackendId());
+        StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(request);
+        ctx.setStreamLoadInfo(streamLoadTask);
+        ctx.setLoadId(request.getLoadId());
+        SqlScanner input = new SqlScanner(new StringReader(loadSql), 
ctx.getSessionVariable().getSqlMode());
+        SqlParser parser = new SqlParser(input);
+        try {
+            StatementBase parsedStmt = SqlParserUtils.getFirstStmt(parser);
+            parsedStmt.setOrigStmt(new OriginStatement(loadSql, 0));
+            parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
+            StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
+            ctx.setExecutor(executor);
+            TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift();
+            executor.analyze(tQueryOptions);
+            Coordinator coord = new Coordinator(ctx, executor.getAnalyzer(), 
executor.planner());
+            coord.setLoadMemLimit(request.getExecMemLimit());
+            coord.setQueryType(TQueryType.LOAD);
+            QeProcessorImpl.INSTANCE.registerQuery(request.getLoadId(), coord);
+            coord.exec();
+        } catch (UserException e) {

Review Comment:
   This line is useless, since you already catch throwable at line 1304



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to