This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 1104d83ec84 branch-3.1: [fix](Prepared Statment) Fix exec prepared 
insert stmt in non master error #48689 (#52271)
1104d83ec84 is described below

commit 1104d83ec84679de56eff4c029ec63b2fcaa8767
Author: James <[email protected]>
AuthorDate: Fri Jun 27 13:34:06 2025 +0800

    branch-3.1: [fix](Prepared Statment) Fix exec prepared insert stmt in non 
master error #48689 (#52271)
    
    backport: #48689
    
    Co-authored-by: Lijia Liu <[email protected]>
---
 .../java/org/apache/doris/qe/ConnectContext.java   |  7 ++++++
 .../java/org/apache/doris/qe/ConnectProcessor.java | 28 +++++++++++++++++++++-
 .../java/org/apache/doris/qe/FEOpExecutor.java     |  7 ++++++
 .../org/apache/doris/qe/MysqlConnectProcessor.java | 28 +++++++++++++++++++---
 .../java/org/apache/doris/qe/StmtExecutor.java     | 17 +++++++++----
 gensrc/thrift/FrontendService.thrift               |  1 +
 .../suites/query_p0/test_forward_qeury.groovy      |  7 +++++-
 7 files changed, 86 insertions(+), 9 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 740c38579f0..ab45bc60f80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -76,6 +76,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.FastThreadLocal;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -83,6 +85,7 @@ import org.json.JSONObject;
 import org.xnio.StreamConnection;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -251,6 +254,10 @@ public class ConnectContext {
     // it's default thread-safe
     private boolean isProxy = false;
 
+    @Getter
+    @Setter
+    private ByteBuffer prepareExecuteBuffer;
+
     private MysqlHandshakePacket mysqlHandshakePacket;
 
     public void setUserQueryTimeout(int queryTimeout) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index cbc0aee98a5..06f5397d3dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -47,6 +47,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.MysqlChannel;
+import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlPacket;
 import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.mysql.MysqlServerStatusFlag;
@@ -60,6 +61,7 @@ import org.apache.doris.nereids.parser.Dialect;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
+import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
 import org.apache.doris.plugin.DialectConverterPlugin;
@@ -86,6 +88,7 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -714,7 +717,24 @@ public abstract class ConnectProcessor {
                 UUID uuid = UUID.randomUUID();
                 queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
             }
-            executor.queryRetry(queryId);
+            if (request.isSetPrepareExecuteBuffer()) {
+                ctx.setCommand(MysqlCommand.COM_STMT_PREPARE);
+                executor.execute();
+                ctx.setCommand(MysqlCommand.COM_STMT_EXECUTE);
+                String preparedStmtId = executor.getPrepareStmtName();
+                PreparedStatementContext preparedStatementContext = 
ctx.getPreparedStementContext(preparedStmtId);
+                if (preparedStatementContext == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Something error, just support nereids 
preparedStmtId:{}", preparedStmtId);
+                    }
+                    throw new RuntimeException("Prepare failed when proxy 
execute");
+                }
+                handleExecute(preparedStatementContext.command, 
Long.parseLong(preparedStmtId),
+                        preparedStatementContext,
+                        
ByteBuffer.wrap(request.getPrepareExecuteBuffer()).order(ByteOrder.LITTLE_ENDIAN),
 queryId);
+            } else {
+                executor.queryRetry(queryId);
+            }
         } catch (IOException e) {
             // Client failed.
             LOG.warn("Process one query failed because IOException: ", e);
@@ -784,4 +804,10 @@ public abstract class ConnectProcessor {
             throw new TException(e.getMessage());
         }
     }
+
+
+    protected void handleExecute(PrepareCommand prepareCommand, long stmtId, 
PreparedStatementContext prepCtx,
+            ByteBuffer packetBuf, TUniqueId queryId) {
+        throw new NotSupportedException("Just MysqlConnectProcessor support 
execute");
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
index 37fe527a137..ecf2f0c8428 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
+import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TExprNode;
@@ -196,6 +197,12 @@ public class FEOpExecutor {
             
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
         }
 
+        if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
+            if (null != ctx.getPrepareExecuteBuffer()) {
+                params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer());
+            }
+        }
+
         return params;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
index 97b5061a212..50990a753c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
@@ -43,6 +43,7 @@ import 
org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.plans.PlaceholderId;
 import org.apache.doris.nereids.trees.plans.commands.ExecuteCommand;
 import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
+import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -100,7 +101,18 @@ public class MysqlConnectProcessor extends 
ConnectProcessor {
         }
     }
 
-    private void handleExecute(PrepareCommand prepareCommand, long stmtId, 
PreparedStatementContext prepCtx) {
+    private String getHexStr(ByteBuffer packetBuf) {
+        byte[] bytes = packetBuf.array();
+        StringBuilder hex = new StringBuilder();
+        for (int i = packetBuf.position(); i < packetBuf.limit(); ++i) {
+            hex.append(String.format("%02X ", bytes[i]));
+        }
+        return hex.toString();
+    }
+
+    @Override
+    protected void handleExecute(PrepareCommand prepareCommand, long stmtId, 
PreparedStatementContext prepCtx,
+            ByteBuffer packetBuf, TUniqueId queryId) {
         int paramCount = prepareCommand.placeholderCount();
         LOG.debug("execute prepared statement {}, paramCount {}", stmtId, 
paramCount);
         // null bitmap
@@ -108,6 +120,12 @@ public class MysqlConnectProcessor extends 
ConnectProcessor {
         try {
             StatementContext statementContext = prepCtx.statementContext;
             if (paramCount > 0) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("execute param buf: {}, array: {}", packetBuf, 
getHexStr(packetBuf));
+                }
+                if (!ctx.isProxy()) {
+                    ctx.setPrepareExecuteBuffer(packetBuf.duplicate());
+                }
                 byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
                 packetBuf.get(nullbitmapData);
                 // new_params_bind_flag
@@ -148,7 +166,11 @@ public class MysqlConnectProcessor extends 
ConnectProcessor {
             stmt.setOrigStmt(prepareCommand.getOriginalStmt());
             executor = new StmtExecutor(ctx, stmt);
             ctx.setExecutor(executor);
-            executor.execute();
+            if (null != queryId) {
+                executor.execute(queryId);
+            } else {
+                executor.execute();
+            }
             if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) {
                 stmtStr = executeStmt.toSql();
                 stmtStr = stmtStr + " /*originalSql = " + 
prepareCommand.getOriginalStmt().originStmt + "*/";
@@ -191,7 +213,7 @@ public class MysqlConnectProcessor extends ConnectProcessor 
{
                     "msg: Not supported such prepared statement");
             return;
         }
-        handleExecute(preparedStatementContext.command, stmtId, 
preparedStatementContext);
+        handleExecute(preparedStatementContext.command, stmtId, 
preparedStatementContext, packetBuf, null);
     }
 
     // Process COM_QUERY statement,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 9f044d2c370..94e3eccba3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -146,6 +146,7 @@ import org.apache.doris.nereids.exceptions.ParseException;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.minidump.MinidumpUtils;
 import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Placeholder;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.commands.Command;
@@ -282,6 +283,7 @@ public class StmtExecutor {
     private Data.PQueryStatistics.Builder statisticsForAuditLog;
     private boolean isCached;
     private String stmtName;
+    private String prepareStmtName; // for prox
     private String mysqlLoadId;
     // Handle selects that fe can do without be
     private boolean isHandleQueryInFe = false;
@@ -704,8 +706,12 @@ public class StmtExecutor {
             }
             long stmtId = Config.prepared_stmt_start_id > 0
                     ? Config.prepared_stmt_start_id : 
context.getPreparedStmtId();
-            logicalPlan = new PrepareCommand(String.valueOf(stmtId),
-                    logicalPlan, statementContext.getPlaceholders(), 
originStmt);
+            this.prepareStmtName = String.valueOf(stmtId);
+            // When proxy executing, this.statementContext is created in 
constructor.
+            // But context.statementContext is created in LogicalPlanBuilder.
+            List<Placeholder> placeholders = context == null
+                    ? statementContext.getPlaceholders() : 
context.getStatementContext().getPlaceholders();
+            logicalPlan = new PrepareCommand(prepareStmtName, logicalPlan, 
placeholders, originStmt);
         }
         // when we in transaction mode, we only support insert into command 
and transaction command
         if (context.isTxnModel()) {
@@ -726,8 +732,7 @@ public class StmtExecutor {
                     if (logicalPlan instanceof InsertIntoTableCommand) {
                         profileType = ProfileType.LOAD;
                     }
-                    if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE
-                            || context.getCommand() == 
MysqlCommand.COM_STMT_EXECUTE) {
+                    if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE) 
{
                         throw new UserException("Forward master command is not 
supported for prepare statement");
                     }
                     if (isProxy) {
@@ -3683,4 +3688,8 @@ public class StmtExecutor {
             context.getMysqlChannel().sendOnePacket(byteBuffer);
         }
     }
+
+    public String getPrepareStmtName() {
+        return this.prepareStmtName;
+    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 19d91f0fded..00f134e3edc 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -597,6 +597,7 @@ struct TMasterOpRequest {
     // transaction load
     29: optional TTxnLoadInfo txnLoadInfo
     30: optional TGroupCommitInfo groupCommitInfo
+    31: optional binary prepareExecuteBuffer
 
     // selectdb cloud
     1000: optional string cloud_cluster
diff --git a/regression-test/suites/query_p0/test_forward_qeury.groovy 
b/regression-test/suites/query_p0/test_forward_qeury.groovy
index d4761c835a2..e2b11e9535f 100644
--- a/regression-test/suites/query_p0/test_forward_qeury.groovy
+++ b/regression-test/suites/query_p0/test_forward_qeury.groovy
@@ -43,7 +43,12 @@ suite("test_forward_query", 'docker') {
 
         cluster.injectDebugPoints(NodeType.FE, 
['StmtExecutor.forward_all_queries' : [forwardAllQueries:true, execute:1]])
 
-        def ret = sql """ SELECT * FROM ${tbl} """
+        def stmt = prepareStatement("""INSERT INTO ${tbl} VALUES(?);""")
+        stmt.setInt(1, 2)
+        stmt.executeUpdate()
+
+        def ret = sql """ SELECT * FROM ${tbl} order by k1"""
         assertEquals(ret[0][0], 1)
+        assertEquals(ret[1][0], 2)
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to