This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new efe684572e3 [Enhancement](Load) Nereids supports http_stream and
group_commit with stream load (#31259)
efe684572e3 is described below
commit efe684572e31887bdd7d914710f13a030b6701a3
Author: 赵硕 <[email protected]>
AuthorDate: Fri Mar 8 13:47:29 2024 +0800
[Enhancement](Load) Nereids supports http_stream and group_commit with
stream load (#31259)
---
.../glue/translator/PhysicalPlanTranslator.java | 13 +-
.../commands/insert/AbstractInsertExecutor.java | 13 ++
.../commands/insert/InsertIntoTableCommand.java | 15 ++-
.../java/org/apache/doris/qe/ConnectContext.java | 9 ++
.../java/org/apache/doris/qe/HttpStreamParams.java | 33 +++++
.../java/org/apache/doris/qe/StmtExecutor.java | 140 +++++++++++++++++++++
.../apache/doris/service/FrontendServiceImpl.java | 99 +++++++--------
7 files changed, 266 insertions(+), 56 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index c489c06b548..afad1a10970 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -163,6 +163,7 @@ import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.EmptySetNode;
import org.apache.doris.planner.ExceptNode;
import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.HiveTableSink;
@@ -419,12 +420,20 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
slotDesc.setIsNullable(column.isAllowNull());
slotDesc.setAutoInc(column.isAutoInc());
}
- OlapTableSink sink = new OlapTableSink(
+ OlapTableSink sink;
+ // This statement is only used in the group_commit mode in the
http_stream
+ if (context.getConnectContext().isGroupCommitStreamLoadSql()) {
+ sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(),
olapTuple,
+ olapTableSink.getTargetTable().getPartitionIds(),
olapTableSink.isSingleReplicaLoad(),
+ context.getSessionVariable().getGroupCommit(), 0);
+ } else {
+ sink = new OlapTableSink(
olapTableSink.getTargetTable(),
olapTuple,
olapTableSink.getPartitionIds().isEmpty() ? null :
olapTableSink.getPartitionIds(),
olapTableSink.isSingleReplicaLoad()
- );
+ );
+ }
sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
rootFragment.setSink(sink);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index a9b3ae6d632..2ee55caf18d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -55,6 +55,7 @@ public abstract class AbstractInsertExecutor {
protected final long createTime = System.currentTimeMillis();
protected long loadedRows = 0;
protected int filteredRows = 0;
+
protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
@@ -75,6 +76,18 @@ public abstract class AbstractInsertExecutor {
return coordinator;
}
+ public DatabaseIf getDatabase() {
+ return database;
+ }
+
+ public TableIf getTable() {
+ return table;
+ }
+
+ public String getLabelName() {
+ return labelName;
+ }
+
/**
* begin transaction if necessary
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index cf70ccd56da..5c727ec2220 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -105,7 +105,12 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
runInternal(ctx, executor);
}
- private void runInternal(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ /**
+ * This function is used to generate the plan for Nereids.
+ * There are some load functions that only need to the plan, such as
stream_load.
+ * Therefore, this section will be presented separately.
+ */
+ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor
executor) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
@@ -151,7 +156,8 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
if (physicalSink instanceof PhysicalOlapTableSink) {
if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) {
- return;
+ // return;
+ throw new AnalysisException("group commit is not supported
in Nereids now");
}
OlapTable olapTable = (OlapTable) targetTableIf;
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label,
planner, insertCtx);
@@ -180,6 +186,11 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
// We exposed @StmtExecutor#cancel as a unified entry point for
statement interruption,
// so we need to set this here
executor.setCoord(insertExecutor.getCoordinator());
+ return insertExecutor;
+ }
+
+ private void runInternal(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
insertExecutor.executeSingleInsert(executor, jobId);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 1c8e72013d7..18bdee1396e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -219,6 +219,7 @@ public class ConnectContext {
private String workloadGroupName = "";
private Map<Long, Backend> insertGroupCommitTableToBeMap = new HashMap<>();
+ private boolean isGroupCommitStreamLoadSql;
private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
@@ -1100,4 +1101,12 @@ public class ConnectContext {
public int getNetWriteTimeout() {
return this.sessionVariable.getNetWriteTimeout();
}
+
+ public boolean isGroupCommitStreamLoadSql() {
+ return isGroupCommitStreamLoadSql;
+ }
+
+ public void setGroupCommitStreamLoadSql(boolean groupCommitStreamLoadSql) {
+ isGroupCommitStreamLoadSql = groupCommitStreamLoadSql;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java
new file mode 100644
index 00000000000..7383392434b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+
+import lombok.Data;
+
+@Data
+public class HttpStreamParams {
+ private TExecPlanFragmentParams params;
+ private long txnId;
+ private DatabaseIf db;
+ private TableIf table;
+ private String label;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 74d9a63365b..990a61e2aef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -110,6 +110,7 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
+import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.load.loadv2.LoadManager;
@@ -135,10 +136,13 @@ import
org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
import
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.GroupCommitPlanner;
+import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
+import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
@@ -186,6 +190,7 @@ import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ProtocolStringList;
import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -2941,6 +2946,141 @@ public class StmtExecutor {
return exprToType(parsedStmt.getResultExprs());
}
+ private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
+ LOG.info("TUniqueId: {} generate stream load plan", queryId);
+ context.setQueryId(queryId);
+ context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+
+ parseByNereids();
+ Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
+ "Nereids only process LogicalPlanAdapter, but parsedStmt is "
+ parsedStmt.getClass().getName());
+ context.getState().setNereids(true);
+ InsertIntoTableCommand insert = (InsertIntoTableCommand)
((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
+ HttpStreamParams httpStreamParams = new HttpStreamParams();
+
+ try {
+ if
(!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+ if (!Config.wait_internal_group_commit_finish &&
insert.getLabelName().isPresent()) {
+ throw new AnalysisException("label and group_commit can't
be set at the same time");
+ }
+ context.setGroupCommitStreamLoadSql(true);
+ }
+ OlapInsertExecutor insertExecutor = (OlapInsertExecutor)
insert.initPlan(context, this);
+ httpStreamParams.setTxnId(insertExecutor.getTxnId());
+ httpStreamParams.setDb(insertExecutor.getDatabase());
+ httpStreamParams.setTable(insertExecutor.getTable());
+ httpStreamParams.setLabel(insertExecutor.getLabelName());
+
+ PlanNode planRoot = planner.getFragments().get(0).getPlanRoot();
+ Preconditions.checkState(planRoot instanceof TVFScanNode ||
planRoot instanceof GroupCommitScanNode,
+ "Nereids' planNode cannot be converted to " +
planRoot.getClass().getName());
+ } catch (QueryStateException e) {
+ LOG.debug("Command(" + originStmt.originStmt + ") process
failed.", e);
+ context.setState(e.getQueryState());
+ throw new NereidsException("Command(" + originStmt.originStmt + ")
process failed",
+ new AnalysisException(e.getMessage(), e));
+ } catch (UserException e) {
+ // Return message to info client what happened.
+ LOG.debug("Command(" + originStmt.originStmt + ") process
failed.", e);
+ context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+ throw new NereidsException("Command (" + originStmt.originStmt +
") process failed",
+ new AnalysisException(e.getMessage(), e));
+ } catch (Exception e) {
+ // Maybe our bug
+ LOG.debug("Command (" + originStmt.originStmt + ") process
failed.", e);
+ context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getMessage());
+ throw new NereidsException("Command (" + originStmt.originStmt +
") process failed.",
+ new AnalysisException(e.getMessage(), e));
+ }
+ return httpStreamParams;
+ }
+
+ private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId)
throws Exception {
+ // Due to executing Nereids, it needs to be reset
+ planner = null;
+ context.getState().setNereids(false);
+ context.setTxnEntry(null);
+ context.setQueryId(queryId);
+ context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+ SqlScanner input = new SqlScanner(new
StringReader(originStmt.originStmt),
+ context.getSessionVariable().getSqlMode());
+ SqlParser parser = new SqlParser(input);
+ parsedStmt = SqlParserUtils.getFirstStmt(parser);
+ if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+ if (!Config.wait_internal_group_commit_finish &&
((NativeInsertStmt) parsedStmt).getLabel() != null) {
+ throw new AnalysisException("label and group_commit can't be
set at the same time");
+ }
+ ((NativeInsertStmt) parsedStmt).isGroupCommitStreamLoadSql = true;
+ }
+ NativeInsertStmt insertStmt = (NativeInsertStmt) parsedStmt;
+ analyze(context.getSessionVariable().toThrift());
+ HttpStreamParams httpStreamParams = new HttpStreamParams();
+ httpStreamParams.setTxnId(insertStmt.getTransactionId());
+ httpStreamParams.setDb(insertStmt.getDbObj());
+ httpStreamParams.setTable(insertStmt.getTargetTable());
+ httpStreamParams.setLabel(insertStmt.getLabel());
+ return httpStreamParams;
+ }
+
+ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws
Exception {
+ SessionVariable sessionVariable = context.getSessionVariable();
+ HttpStreamParams httpStreamParams = null;
+ try {
+ if (sessionVariable.isEnableNereidsPlanner()) {
+ try {
+ httpStreamParams = generateHttpStreamNereidsPlan(queryId);
+ } catch (NereidsException | ParseException e) {
+ if (context.getMinidump() != null &&
context.getMinidump().toString(4) != null) {
+
MinidumpUtils.saveMinidumpString(context.getMinidump(),
DebugUtil.printId(context.queryId()));
+ }
+ // try to fall back to legacy planner
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("nereids cannot process statement\n" +
originStmt.originStmt
+ + "\n because of " + e.getMessage(), e);
+ }
+ if (notAllowFallback()) {
+ LOG.warn("Analyze failed. {}",
context.getQueryIdentifier(), e);
+ throw ((NereidsException) e).getException();
+ }
+ boolean isInsertIntoCommand = parsedStmt != null &&
parsedStmt instanceof LogicalPlanAdapter
+ && ((LogicalPlanAdapter)
parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
+ if (e instanceof NereidsException
+ &&
!context.getSessionVariable().enableFallbackToOriginalPlanner
+ && !isInsertIntoCommand) {
+ LOG.warn("Analyze failed. {}",
context.getQueryIdentifier(), e);
+ throw ((NereidsException) e).getException();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("fall back to legacy planner on
statement:\n{}", originStmt.originStmt);
+ }
+ // Attention: currently exception from nereids does not
mean an Exception to user terminal
+ // unless user does not allow fallback to lagency planner.
But state of query
+ // has already been set to Error in this case, it will
have some side effect on profile result
+ // and audit log. So we need to reset state to OK if query
cancel be processd by lagency.
+ context.getState().reset();
+ context.getState().setNereids(false);
+ httpStreamParams = generateHttpStreamLegacyPlan(queryId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ httpStreamParams = generateHttpStreamLegacyPlan(queryId);
+ }
+ } finally {
+ // revert Session Value
+ try {
+ VariableMgr.revertSessionValue(sessionVariable);
+ // origin value init
+ sessionVariable.setIsSingleSetVar(false);
+ sessionVariable.clearSessionOriginValue();
+ } catch (DdlException e) {
+ LOG.warn("failed to revert Session value. {}",
context.getQueryIdentifier(), e);
+ context.getState().setError(e.getMysqlErrorCode(),
e.getMessage());
+ }
+ }
+ return httpStreamParams;
+ }
+
public SummaryProfile getSummaryProfile() {
return profile.getSummaryProfile();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8931d7b0859..f51044ec5dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -21,12 +21,9 @@ import
org.apache.doris.analysis.AbstractBackupTableRefClause;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.NativeInsertStmt;
import org.apache.doris.analysis.PartitionExprUtil;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.SetType;
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.UserIdentity;
@@ -65,7 +62,6 @@ import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.annotation.LogException;
-import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogIf;
@@ -90,9 +86,9 @@ import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.GlobalVariable;
+import org.apache.doris.qe.HttpStreamParams;
import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.qe.MysqlConnectProcessor;
-import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
@@ -198,7 +194,6 @@ import org.apache.doris.thrift.TPrivilegeCtrl;
import org.apache.doris.thrift.TPrivilegeHier;
import org.apache.doris.thrift.TPrivilegeStatus;
import org.apache.doris.thrift.TPrivilegeType;
-import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryStatsResult;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReplicaInfo;
@@ -251,7 +246,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
-import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -2022,12 +2016,42 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request,
ConnectContext ctx)
+ throws UserException {
+ String originStmt = request.getLoadSql();
+ HttpStreamParams httpStreamParams;
+ try {
+ StmtExecutor executor = new StmtExecutor(ctx, originStmt);
+ ctx.setExecutor(executor);
+ httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId());
+
+ Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
+ Coordinator coord = new Coordinator(ctx, analyzer,
executor.planner());
+ coord.setLoadMemLimit(request.getExecMemLimit());
+ coord.setQueryType(TQueryType.LOAD);
+ TableIf table = httpStreamParams.getTable();
+ if (table instanceof OlapTable) {
+ boolean isEnableMemtableOnSinkNode =
+ ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
+ ?
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
+
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+ }
+ httpStreamParams.setParams(coord.getStreamLoadPlan());
+ } catch (UserException e) {
+ LOG.warn("exec sql error", e);
+ throw new UserException("exec sql error" + e);
+ } catch (Throwable e) {
+ LOG.warn("exec sql error catch unknown result.", e);
+ throw new UserException("exec sql error catch unknown result." +
e);
+ }
+ return httpStreamParams;
+ }
+
private void httpStreamPutImpl(TStreamLoadPutRequest request,
TStreamLoadPutResult result, ConnectContext ctx)
throws UserException {
if (LOG.isDebugEnabled()) {
LOG.debug("receive http stream put request: {}", request);
}
- String originStmt = request.getLoadSql();
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (Strings.isNullOrEmpty(request.getToken())) {
@@ -2040,55 +2064,26 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
} else {
ctx.getSessionVariable().enableMemtableOnSinkNode =
Config.stream_load_default_memtable_on_sink_node;
}
- SqlScanner input = new SqlScanner(new StringReader(originStmt),
ctx.getSessionVariable().getSqlMode());
- SqlParser parser = new SqlParser(input);
+ ctx.getSessionVariable().groupCommit = request.getGroupCommitMode();
try {
- NativeInsertStmt parsedStmt = (NativeInsertStmt)
SqlParserUtils.getFirstStmt(parser);
- parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
- parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
- if (!StringUtils.isEmpty(request.getGroupCommitMode())) {
- if (!Config.wait_internal_group_commit_finish &&
parsedStmt.getLabel() != null) {
- throw new AnalysisException("label and group_commit can't
be set at the same time");
- }
- ctx.getSessionVariable().groupCommit =
request.getGroupCommitMode();
- parsedStmt.isGroupCommitStreamLoadSql = true;
- }
- StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
- ctx.setExecutor(executor);
- TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift();
- executor.analyze(tQueryOptions);
- Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
- Coordinator coord = new Coordinator(ctx, analyzer,
executor.planner());
- coord.setLoadMemLimit(request.getExecMemLimit());
- coord.setQueryType(TQueryType.LOAD);
- Table table = parsedStmt.getTargetTable();
- if (table instanceof OlapTable) {
- boolean isEnableMemtableOnSinkNode =
- ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
- ?
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
-
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
- }
-
- TExecPlanFragmentParams plan = coord.getStreamLoadPlan();
+ HttpStreamParams httpStreamParams = initHttpStreamPlan(request,
ctx);
int loadStreamPerNode = 20;
if (request.getStreamPerNode() > 0) {
loadStreamPerNode = request.getStreamPerNode();
}
- plan.setLoadStreamPerNode(loadStreamPerNode);
- plan.setTotalLoadStreams(loadStreamPerNode);
- plan.setNumLocalSink(1);
- final long txn_id = parsedStmt.getTransactionId();
- result.setParams(plan);
- result.getParams().setDbName(parsedStmt.getDbName());
- result.getParams().setTableName(parsedStmt.getTbl());
- // The txn_id here is obtained from the NativeInsertStmt
- result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
- result.getParams().setImportLabel(parsedStmt.getLabel());
- result.setDbId(table.getDatabase().getId());
- result.setTableId(table.getId());
- result.setBaseSchemaVersion(((OlapTable)
table).getBaseSchemaVersion());
- result.setGroupCommitIntervalMs(((OlapTable)
table).getGroupCommitIntervalMs());
- result.setGroupCommitDataBytes(((OlapTable)
table).getGroupCommitDataBytes());
+
httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode);
+
httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode);
+ httpStreamParams.getParams().setNumLocalSink(1);
+ result.setParams(httpStreamParams.getParams());
+
result.getParams().setDbName(httpStreamParams.getDb().getFullName());
+
result.getParams().setTableName(httpStreamParams.getTable().getName());
+ result.getParams().setTxnConf(new
TTxnParams().setTxnId(httpStreamParams.getTxnId()));
+ result.getParams().setImportLabel(httpStreamParams.getLabel());
+ result.setDbId(httpStreamParams.getDb().getId());
+ result.setTableId(httpStreamParams.getTable().getId());
+ result.setBaseSchemaVersion(((OlapTable)
httpStreamParams.getTable()).getBaseSchemaVersion());
+ result.setGroupCommitIntervalMs(((OlapTable)
httpStreamParams.getTable()).getGroupCommitIntervalMs());
+ result.setGroupCommitDataBytes(((OlapTable)
httpStreamParams.getTable()).getGroupCommitDataBytes());
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
} catch (UserException e) {
LOG.warn("exec sql error", e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]