This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ca47d75e834 [fix](regression) Add regression for group commit executed
on observe… (#26692)
ca47d75e834 is described below
commit ca47d75e834a204010ff12082816fc446aa6635d
Author: meiyi <[email protected]>
AuthorDate: Fri Nov 10 18:53:45 2023 +0800
[fix](regression) Add regression for group commit executed on observe…
(#26692)
---
.../java/org/apache/doris/qe/StmtExecutor.java | 5 ++
.../insert_p0/insert_group_commit_into.groovy | 53 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5fa8a8205cb..0c3902372df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1813,6 +1813,7 @@ public class StmtExecutor {
TransactionStatus txnStatus = TransactionStatus.ABORTED;
String errMsg = "";
TableType tblType = insertStmt.getTargetTable().getType();
+ boolean isGroupCommit = false;
if (context.isTxnModel()) {
if (insertStmt.getQueryStmt() instanceof SelectStmt) {
if (((SelectStmt)
insertStmt.getQueryStmt()).getTableRefs().size() > 0) {
@@ -1824,6 +1825,7 @@ public class StmtExecutor {
label = context.getTxnEntry().getLabel();
txnId = context.getTxnEntry().getTxnConf().getTxnId();
} else if (insertStmt instanceof NativeInsertStmt &&
((NativeInsertStmt) insertStmt).isGroupCommit()) {
+ isGroupCommit = true;
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
@@ -2013,6 +2015,9 @@ public class StmtExecutor {
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
+ if (isGroupCommit) {
+ sb.append(",
'query_id':'").append(DebugUtil.printId(context.queryId)).append("'");
+ }
sb.append("}");
context.getState().setOk(loadedRows, filteredRows, sb.toString());
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index bea130d1067..736e774fbb1 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -16,6 +16,7 @@
// under the License.
import com.mysql.cj.jdbc.StatementImpl
+import org.codehaus.groovy.runtime.IOGroovyMethods
suite("insert_group_commit_into") {
def dbName = "regression_test_insert_p0"
@@ -65,6 +66,7 @@ suite("insert_group_commit_into") {
// assertEquals(result, expected_row_count)
assertTrue(serverInfo.contains("'status':'PREPARE'"))
assertTrue(serverInfo.contains("'label':'group_commit_"))
+ return serverInfo
}
def none_group_commit_insert = { sql, expected_row_count ->
@@ -211,6 +213,55 @@ suite("insert_group_commit_into") {
// try_sql("DROP TABLE ${table}")
}
+ // test connect to observer fe
+ try {
+ def fes = sql_return_maparray "show frontends"
+ logger.info("frontends: ${fes}")
+ if (fes.size() > 1) {
+ def observer_fe = null
+ for (def fe : fes) {
+ if (fe.IsMaster == "false") {
+ observer_fe = fe
+ break
+ }
+ }
+ if (observer_fe != null) {
+ def url =
"jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/"
+ logger.info("observer url: " + url)
+ connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = url) {
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ sql """ set enable_profile= true; """
+
+ // 1. insert into
+ def server_info = group_commit_insert """ insert into
${table}(name, id) values('c', 3); """, 1
+ assertTrue(server_info.contains('query_id'))
+ // get query_id, such as
43f87963586a482a-b0496bcf9e2b5555
+ def query_id_index =
server_info.indexOf("'query_id':'") + "'query_id':'".length()
+ def query_id = server_info.substring(query_id_index,
query_id_index + 33)
+ logger.info("query_id: " + query_id)
+ // 2. check profile
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET -u
${context.config.jdbcUser}:${context.config.jdbcPassword}
http://${observer_fe.Host}:${observer_fe.HttpPort}")
+ sb.append("/api/profile?query_id=").append(query_id)
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def err = IOGroovyMethods.getText(new
BufferedReader(new InputStreamReader(process.getErrorStream())));
+ def out = process.getText()
+ logger.info("Get profile: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def json = parseJson(out)
+ assertEquals("success", json.msg.toLowerCase())
+ }
+ }
+ } else {
+ logger.info("only one fe, skip test connect to observer fe")
+ }
+ } finally {
+ }
+
// table with array type
tableName = "insert_group_commit_into_duplicate_array"
table = dbName + "." + tableName
@@ -266,4 +317,4 @@ suite("insert_group_commit_into") {
// try_sql("DROP TABLE ${table}")
}
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]