This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 91f0301b434 [fix](group commit) Pick some group commit pr (#38320)
91f0301b434 is described below
commit 91f0301b434facb5167eaca4b13e2609e01c5b50
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 25 17:32:44 2024 +0800
[fix](group commit) Pick some group commit pr (#38320)
Pick https://github.com/apache/doris/pull/38292,
https://github.com/apache/doris/pull/34021,
https://github.com/apache/doris/pull/38228, some modify of
https://github.com/apache/doris/pull/37260, some modify of
https://github.com/apache/doris/pull/37595
---
be/src/runtime/group_commit_mgr.cpp | 25 +++++++++------
be/src/runtime/group_commit_mgr.h | 2 ++
be/src/service/internal_service.cpp | 4 +++
be/src/vec/sink/group_commit_block_sink.cpp | 14 +++++++-
.../doris/alter/MaterializedViewHandler.java | 15 ++++-----
.../java/org/apache/doris/qe/StmtExecutor.java | 6 ++++
gensrc/proto/internal_service.proto | 1 +
.../insert_p0/insert_group_commit_into.groovy | 37 +++++++++++++++++++---
...nsert_group_commit_into_max_filter_ratio.groovy | 2 +-
.../test_group_commit_stream_load.groovy | 2 +-
10 files changed, 82 insertions(+), 26 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 6bbbe88d028..7bb30b1cc8b 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -130,16 +130,23 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
}
} else {
if (duration >= 10 * _group_commit_interval_ms) {
- std::stringstream ss;
- ss << "[";
- for (auto& id : _load_ids) {
- ss << id.to_string() << ", ";
+ auto last_print_duration =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() -
_last_print_time)
+ .count();
+ if (last_print_duration >= 10000) {
+ _last_print_time = std::chrono::steady_clock::now();
+ std::stringstream ss;
+ ss << "[";
+ for (auto& id : _load_ids) {
+ ss << id.to_string() << ", ";
+ }
+ ss << "]";
+ LOG(INFO) << "find one group_commit need to commit,
txn_id=" << txn_id
+ << ", label=" << label << ", instance_id=" <<
load_instance_id
+ << ", duration=" << duration << ", load_ids=" <<
ss.str()
+ << ", runtime_state=" << runtime_state;
}
- ss << "]";
- LOG(INFO) << "find one group_commit need to commit, txn_id="
<< txn_id
- << ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", duration=" << duration << ", load_ids=" <<
ss.str()
- << ", runtime_state=" << runtime_state;
}
}
_get_cond.wait_for(l, std::chrono::milliseconds(
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index c41f6abd6fe..49ca0b3b505 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -62,6 +62,7 @@ public:
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_group_commit_interval_ms(group_commit_interval_ms),
_start_time(std::chrono::steady_clock::now()),
+ _last_print_time(_start_time),
_group_commit_data_bytes(group_commit_data_bytes),
_all_block_queues_bytes(all_block_queues_bytes) {};
@@ -112,6 +113,7 @@ private:
// commit by time interval, can be changed by 'ALTER TABLE my_table SET
("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::chrono::steady_clock::time_point _start_time;
+ std::chrono::steady_clock::time_point _last_print_time;
// commit by data size
int64_t _group_commit_data_bytes;
int64_t _data_bytes = 0;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index bd88c432b53..3be1d2eefdf 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2250,6 +2250,10 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
response->set_loaded_rows(state->num_rows_load_success());
response->set_filtered_rows(state->num_rows_load_filtered());
status->to_protobuf(response->mutable_status());
+ if (!state->get_error_log_file_path().empty()) {
+ response->set_error_url(
+
to_load_error_http_path(state->get_error_log_file_path()));
+ }
_exec_env->new_load_stream_mgr()->remove(load_id);
});
} catch (const Exception& e) {
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index a46be4760e7..3fb36922415 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -167,13 +167,25 @@ Status GroupCommitBlockSink::send(RuntimeState* state,
vectorized::Block* input_
for (int index = 0; index < rows; index++) {
_vpartition->find_partition(block.get(), index,
_partitions[index]);
}
+ bool stop_processing = false;
for (int row_index = 0; row_index < rows; row_index++) {
if (_partitions[row_index] == nullptr) [[unlikely]] {
_filter_bitmap.Set(row_index, true);
LOG(WARNING) << "no partition for this tuple. tuple="
<< block->dump_data(row_index, 1);
+ RETURN_IF_ERROR(state->append_error_msg_to_file(
+ []() -> std::string { return ""; },
+ [&]() -> std::string {
+ fmt::memory_buffer buf;
+ fmt::format_to(buf, "no partition for this tuple.
tuple=\n{}",
+ block->dump_data(row_index, 1));
+ return fmt::to_string(buf);
+ },
+ &stop_processing));
+ _has_filtered_rows = true;
+ state->update_num_rows_load_filtered(1);
+ state->update_num_rows_load_total(-1);
}
- _has_filtered_rows = true;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index a6a132e316c..41b14268136 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -183,6 +183,9 @@ public class MaterializedViewHandler extends AlterHandler {
*/
public void processCreateMaterializedView(CreateMaterializedViewStmt
addMVClause, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
+ // wait wal delete
+
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
@@ -217,11 +220,6 @@ public class MaterializedViewHandler extends AlterHandler {
addAlterJobV2(rollupJobV2);
olapTable.setState(OlapTableState.ROLLUP);
-
- // wait wal delete
-
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}",
rollupJobV2.getJobId());
} finally {
@@ -244,6 +242,9 @@ public class MaterializedViewHandler extends AlterHandler {
public void processBatchAddRollup(String rawSql, List<AlterClause>
alterClauses, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
checkReplicaCount(olapTable);
+ // wait wal delete
+
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
// save job id for log
Set<Long> logJobIdSet = new HashSet<>();
@@ -305,10 +306,6 @@ public class MaterializedViewHandler extends AlterHandler {
// but this order is more reasonable
olapTable.setState(OlapTableState.ROLLUP);
- // wait wal delete
-
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
// 2 batch submit rollup job
List<AlterJobV2> rollupJobV2List = new
ArrayList<>(rollupNameJobMap.values());
batchAddAlterJobV2(rollupJobV2List);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 4e658cf40ca..276ba93dba7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2207,11 +2207,17 @@ public class StmtExecutor {
errMsg = "group commit insert failed. db_id: " + dbId
+ ", table_id: " + tableId
+ ", query_id: " +
DebugUtil.printId(context.queryId()) + ", backend_id: "
+ groupCommitPlanner.getBackend().getId() + ",
status: " + response.getStatus();
+ if (response.hasErrorUrl()) {
+ errMsg += ", error url: " + response.getErrorUrl();
+ }
}
} else if (code != TStatusCode.OK) {
errMsg = "group commit insert failed. db_id: " + dbId + ",
table_id: " + tableId + ", query_id: "
+ DebugUtil.printId(context.queryId()) + ",
backend_id: " + groupCommitPlanner.getBackend()
.getId() + ", status: " + response.getStatus();
+ if (response.hasErrorUrl()) {
+ errMsg += ", error url: " + response.getErrorUrl();
+ }
ErrorReport.reportDdlException(errMsg,
ErrorCode.ERR_FAILED_WHEN_INSERT);
}
label = response.getLabel();
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index a0b0aac6e5a..9b2f106a1f4 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -816,6 +816,7 @@ message PGroupCommitInsertResponse {
optional int64 txn_id = 3;
optional int64 loaded_rows = 4;
optional int64 filtered_rows = 5;
+ optional string error_url = 6;
}
message POpenLoadStreamRequest {
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 5c60ff82bb8..371e392fcc3 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -69,6 +69,24 @@ suite("insert_group_commit_into") {
return serverInfo
}
+ def group_commit_insert_with_retry = { sql, expected_row_count ->
+ def retry = 0
+ while (true){
+ try {
+ return group_commit_insert(sql, expected_row_count)
+ } catch (Exception e) {
+ logger.warn("group_commit_insert failed, retry: " + retry + ",
error: " + e.getMessage())
+ retry++
+ if (e.getMessage().contains("is blocked on schema change") &&
retry < 20) {
+ sleep(1500)
+ continue
+ } else {
+ throw e
+ }
+ }
+ }
+ }
+
def none_group_commit_insert = { sql, expected_row_count ->
def stmt = prepareStatement """ ${sql} """
def result = stmt.executeUpdate()
@@ -186,10 +204,19 @@ suite("insert_group_commit_into") {
// 7. insert into and add rollup
group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4);
""", 1
+ sql "set enable_insert_strict=false"
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50),(101, 'a', 100); """, 2
- // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
- group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ sql "set enable_insert_strict=true"
+ try {
+ sql """ insert into ${table} values (102, 'a', 100); """
+ assertTrue(false, "insert should fail")
+ } catch (Exception e) {
+ logger.info("error: " + e.getMessage())
+ assertTrue(e.getMessage().contains("url:"))
+ }
+ sql """ alter table ${table} ADD ROLLUP r1(name, score); """
+ group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
getRowCount(20)
qt_sql """ select name, score from ${table} order by name asc;
"""
@@ -237,7 +264,7 @@ suite("insert_group_commit_into") {
// 1. insert into
def server_info = group_commit_insert """ insert into
${table}(name, id) values('c', 3); """, 1
- assertTrue(server_info.contains('query_id'))
+ /*assertTrue(server_info.contains('query_id'))
// get query_id, such as
43f87963586a482a-b0496bcf9e2b5555
def query_id_index =
server_info.indexOf("'query_id':'") + "'query_id':'".length()
def query_id = server_info.substring(query_id_index,
query_id_index + 33)
@@ -255,7 +282,7 @@ suite("insert_group_commit_into") {
logger.info("Get profile: code=" + code + ", out=" +
out + ", err=" + err)
assertEquals(code, 0)
def json = parseJson(out)
- assertEquals("success", json.msg.toLowerCase())
+ assertEquals("success", json.msg.toLowerCase())*/
}
}
} else {
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
index 51264d3d863..5d6339d0894 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
@@ -216,7 +216,7 @@ suite("insert_group_commit_into_max_filter_ratio") {
sql """ set group_commit = async_mode; """
sql """ set enable_insert_strict = false; """
- group_commit_insert """ insert into ${dbTableName} values (9, 'a',
'a'); """, 0
+ group_commit_insert """ insert into ${dbTableName} values (9, 'a',
'a'); """, 1
}
if (item == "nereids") {
get_row_count_with_retry(6)
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index e12a1f2f01b..0f823f1d4ef 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
- checkStreamLoadResult(exception, result, 6, 3, 2, 1)
+ checkStreamLoadResult(exception, result, 6, 2, 3, 1)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]