zzzzzzzs commented on code in PR #31259:
URL: https://github.com/apache/doris/pull/31259#discussion_r1515362656
##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -2967,6 +2972,141 @@ public List<Type> getReturnTypes() {
return exprToType(parsedStmt.getResultExprs());
}
+ private HttpStreamParams generateStreamLoadNereidsPlan(TUniqueId queryId) {
+ LOG.info("TUniqueId: {} generate stream load plan", queryId);
+ context.setQueryId(queryId);
+ context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+
+ parseByNereids();
+ Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
+ "Nereids only process LogicalPlanAdapter, but parsedStmt is "
+ parsedStmt.getClass().getName());
+ context.getState().setNereids(true);
+ InsertIntoTableCommand insert = (InsertIntoTableCommand)
((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
+ HttpStreamParams httpStreamParams = new HttpStreamParams();
+
+ try {
+ if
(!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+ if (!Config.wait_internal_group_commit_finish &&
insert.getLabelName().isPresent()) {
+ throw new AnalysisException("label and group_commit can't
be set at the same time");
+ }
+ context.setGroupCommitStreamLoadSql(true);
+ }
+ OlapInsertExecutor insertExecutor = (OlapInsertExecutor)
insert.initPlan(context, this);
+ httpStreamParams.setTxnId(insertExecutor.getTxnId());
+ httpStreamParams.setDb(insertExecutor.getDatabase());
+ httpStreamParams.setTable(insertExecutor.getTable());
+ httpStreamParams.setLabel(insertExecutor.getLabelName());
+
+ PlanNode planRoot = planner.getFragments().get(0).getPlanRoot();
+ Preconditions.checkState(planRoot instanceof TVFScanNode ||
planRoot instanceof GroupCommitScanNode,
+ "Nereids' planNode cannot be converted to " +
planRoot.getClass().getName());
+ } catch (QueryStateException e) {
+ LOG.debug("Command(" + originStmt.originStmt + ") process
failed.", e);
+ context.setState(e.getQueryState());
+ throw new NereidsException("Command(" + originStmt.originStmt + ")
process failed",
+ new AnalysisException(e.getMessage(), e));
+ } catch (UserException e) {
+ // Return message to info client what happened.
+ LOG.debug("Command(" + originStmt.originStmt + ") process
failed.", e);
+ context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+ throw new NereidsException("Command (" + originStmt.originStmt +
") process failed",
+ new AnalysisException(e.getMessage(), e));
+ } catch (Exception e) {
+ // Maybe our bug
+ LOG.debug("Command (" + originStmt.originStmt + ") process
failed.", e);
+ context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getMessage());
+ throw new NereidsException("Command (" + originStmt.originStmt +
") process failed.",
+ new AnalysisException(e.getMessage(), e));
+ }
+ return httpStreamParams;
+ }
+
+ private HttpStreamParams generateStreamLoadLegacyPlan(TUniqueId queryId)
throws Exception {
+ // Due to executing Nereids, it needs to be reset
+ planner = null;
+ context.getState().setNereids(false);
+ context.setTxnEntry(null);
+ context.setQueryId(queryId);
+ context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+ SqlScanner input = new SqlScanner(new
StringReader(originStmt.originStmt),
+ context.getSessionVariable().getSqlMode());
+ SqlParser parser = new SqlParser(input);
+ parsedStmt = SqlParserUtils.getFirstStmt(parser);
+ if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+ if (!Config.wait_internal_group_commit_finish &&
((NativeInsertStmt) parsedStmt).getLabel() != null) {
+ throw new AnalysisException("label and group_commit can't be
set at the same time");
+ }
+ ((NativeInsertStmt) parsedStmt).isGroupCommitStreamLoadSql = true;
+ }
+ NativeInsertStmt insertStmt = (NativeInsertStmt) parsedStmt;
+ analyze(context.getSessionVariable().toThrift());
+ HttpStreamParams httpStreamParams = new HttpStreamParams();
+ httpStreamParams.setTxnId(insertStmt.getTransactionId());
+ httpStreamParams.setDb(insertStmt.getDbObj());
+ httpStreamParams.setTable(insertStmt.getTargetTable());
+ httpStreamParams.setLabel(insertStmt.getLabel());
+ return httpStreamParams;
+ }
+
+ public HttpStreamParams generateStreamLoadPlan(TUniqueId queryId) throws
Exception {
Review Comment:
done
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java:
##########
@@ -117,8 +122,8 @@ private void runInternal(ConnectContext ctx, StmtExecutor
executor) throws Excep
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(),
targetTableIf.getDatabase().getCatalog().getName(),
- targetTableIf.getDatabase().getFullName(),
targetTableIf.getName(),
- PrivPredicate.LOAD)) {
+ targetTableIf.getDatabase().getFullName(),
targetTableIf.getName(),
+ PrivPredicate.LOAD)) {
Review Comment:
done
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java:
##########
@@ -150,14 +155,15 @@ private void runInternal(ConnectContext ctx, StmtExecutor
executor) throws Excep
if (physicalSink instanceof PhysicalOlapTableSink) {
if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) {
- return;
+ // return;
+ throw new AnalysisException("group commit is not supported
in Nereids now");
}
OlapTable olapTable = (OlapTable) targetTableIf;
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label,
planner, insertCtx);
boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
- ?
insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
- : false;
+ ?
insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
+ : false;
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]