This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 91f0301b434 [fix](group commit) Pick some group commit pr (#38320)
91f0301b434 is described below

commit 91f0301b434facb5167eaca4b13e2609e01c5b50
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 25 17:32:44 2024 +0800

    [fix](group commit) Pick some group commit pr (#38320)
    
    Pick https://github.com/apache/doris/pull/38292,
    https://github.com/apache/doris/pull/34021,
    https://github.com/apache/doris/pull/38228, some modify of
    https://github.com/apache/doris/pull/37260, some modify of
    https://github.com/apache/doris/pull/37595
---
 be/src/runtime/group_commit_mgr.cpp                | 25 +++++++++------
 be/src/runtime/group_commit_mgr.h                  |  2 ++
 be/src/service/internal_service.cpp                |  4 +++
 be/src/vec/sink/group_commit_block_sink.cpp        | 14 +++++++-
 .../doris/alter/MaterializedViewHandler.java       | 15 ++++-----
 .../java/org/apache/doris/qe/StmtExecutor.java     |  6 ++++
 gensrc/proto/internal_service.proto                |  1 +
 .../insert_p0/insert_group_commit_into.groovy      | 37 +++++++++++++++++++---
 ...nsert_group_commit_into_max_filter_ratio.groovy |  2 +-
 .../test_group_commit_stream_load.groovy           |  2 +-
 10 files changed, 82 insertions(+), 26 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 6bbbe88d028..7bb30b1cc8b 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -130,16 +130,23 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
             }
         } else {
             if (duration >= 10 * _group_commit_interval_ms) {
-                std::stringstream ss;
-                ss << "[";
-                for (auto& id : _load_ids) {
-                    ss << id.to_string() << ", ";
+                auto last_print_duration =
+                        std::chrono::duration_cast<std::chrono::milliseconds>(
+                                std::chrono::steady_clock::now() - 
_last_print_time)
+                                .count();
+                if (last_print_duration >= 10000) {
+                    _last_print_time = std::chrono::steady_clock::now();
+                    std::stringstream ss;
+                    ss << "[";
+                    for (auto& id : _load_ids) {
+                        ss << id.to_string() << ", ";
+                    }
+                    ss << "]";
+                    LOG(INFO) << "find one group_commit need to commit, 
txn_id=" << txn_id
+                              << ", label=" << label << ", instance_id=" << 
load_instance_id
+                              << ", duration=" << duration << ", load_ids=" << 
ss.str()
+                              << ", runtime_state=" << runtime_state;
                 }
-                ss << "]";
-                LOG(INFO) << "find one group_commit need to commit, txn_id=" 
<< txn_id
-                          << ", label=" << label << ", instance_id=" << 
load_instance_id
-                          << ", duration=" << duration << ", load_ids=" << 
ss.str()
-                          << ", runtime_state=" << runtime_state;
             }
         }
         _get_cond.wait_for(l, std::chrono::milliseconds(
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index c41f6abd6fe..49ca0b3b505 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -62,6 +62,7 @@ public:
               
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
               _group_commit_interval_ms(group_commit_interval_ms),
               _start_time(std::chrono::steady_clock::now()),
+              _last_print_time(_start_time),
               _group_commit_data_bytes(group_commit_data_bytes),
               _all_block_queues_bytes(all_block_queues_bytes) {};
 
@@ -112,6 +113,7 @@ private:
     // commit by time interval, can be changed by 'ALTER TABLE my_table SET 
("group_commit_interval_ms"="1000");'
     int64_t _group_commit_interval_ms;
     std::chrono::steady_clock::time_point _start_time;
+    std::chrono::steady_clock::time_point _last_print_time;
     // commit by data size
     int64_t _group_commit_data_bytes;
     int64_t _data_bytes = 0;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index bd88c432b53..3be1d2eefdf 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2250,6 +2250,10 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
                             
response->set_loaded_rows(state->num_rows_load_success());
                             
response->set_filtered_rows(state->num_rows_load_filtered());
                             status->to_protobuf(response->mutable_status());
+                            if (!state->get_error_log_file_path().empty()) {
+                                response->set_error_url(
+                                        
to_load_error_http_path(state->get_error_log_file_path()));
+                            }
                             _exec_env->new_load_stream_mgr()->remove(load_id);
                         });
             } catch (const Exception& e) {
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index a46be4760e7..3fb36922415 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -167,13 +167,25 @@ Status GroupCommitBlockSink::send(RuntimeState* state, 
vectorized::Block* input_
         for (int index = 0; index < rows; index++) {
             _vpartition->find_partition(block.get(), index, 
_partitions[index]);
         }
+        bool stop_processing = false;
         for (int row_index = 0; row_index < rows; row_index++) {
             if (_partitions[row_index] == nullptr) [[unlikely]] {
                 _filter_bitmap.Set(row_index, true);
                 LOG(WARNING) << "no partition for this tuple. tuple="
                              << block->dump_data(row_index, 1);
+                RETURN_IF_ERROR(state->append_error_msg_to_file(
+                        []() -> std::string { return ""; },
+                        [&]() -> std::string {
+                            fmt::memory_buffer buf;
+                            fmt::format_to(buf, "no partition for this tuple. 
tuple=\n{}",
+                                           block->dump_data(row_index, 1));
+                            return fmt::to_string(buf);
+                        },
+                        &stop_processing));
+                _has_filtered_rows = true;
+                state->update_num_rows_load_filtered(1);
+                state->update_num_rows_load_total(-1);
             }
-            _has_filtered_rows = true;
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index a6a132e316c..41b14268136 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -183,6 +183,9 @@ public class MaterializedViewHandler extends AlterHandler {
      */
     public void processCreateMaterializedView(CreateMaterializedViewStmt 
addMVClause, Database db, OlapTable olapTable)
             throws DdlException, AnalysisException {
+        // wait wal delete
+        
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+        
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
         olapTable.writeLockOrDdlException();
         try {
             olapTable.checkNormalStateForAlter();
@@ -217,11 +220,6 @@ public class MaterializedViewHandler extends AlterHandler {
             addAlterJobV2(rollupJobV2);
 
             olapTable.setState(OlapTableState.ROLLUP);
-
-            // wait wal delete
-            
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-            
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
             Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
             LOG.info("finished to create materialized view job: {}", 
rollupJobV2.getJobId());
         } finally {
@@ -244,6 +242,9 @@ public class MaterializedViewHandler extends AlterHandler {
     public void processBatchAddRollup(String rawSql, List<AlterClause> 
alterClauses, Database db, OlapTable olapTable)
             throws DdlException, AnalysisException {
         checkReplicaCount(olapTable);
+        // wait wal delete
+        
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+        
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
         Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
         // save job id for log
         Set<Long> logJobIdSet = new HashSet<>();
@@ -305,10 +306,6 @@ public class MaterializedViewHandler extends AlterHandler {
             // but this order is more reasonable
             olapTable.setState(OlapTableState.ROLLUP);
 
-            // wait wal delete
-            
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-            
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
             // 2 batch submit rollup job
             List<AlterJobV2> rollupJobV2List = new 
ArrayList<>(rollupNameJobMap.values());
             batchAddAlterJobV2(rollupJobV2List);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 4e658cf40ca..276ba93dba7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2207,11 +2207,17 @@ public class StmtExecutor {
                         errMsg = "group commit insert failed. db_id: " + dbId 
+ ", table_id: " + tableId
                                 + ", query_id: " + 
DebugUtil.printId(context.queryId()) + ", backend_id: "
                                 + groupCommitPlanner.getBackend().getId() + ", 
status: " + response.getStatus();
+                        if (response.hasErrorUrl()) {
+                            errMsg += ", error url: " + response.getErrorUrl();
+                        }
                     }
                 } else if (code != TStatusCode.OK) {
                     errMsg = "group commit insert failed. db_id: " + dbId + ", 
table_id: " + tableId + ", query_id: "
                             + DebugUtil.printId(context.queryId()) + ", 
backend_id: " + groupCommitPlanner.getBackend()
                             .getId() + ", status: " + response.getStatus();
+                    if (response.hasErrorUrl()) {
+                        errMsg += ", error url: " + response.getErrorUrl();
+                    }
                     ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
                 }
                 label = response.getLabel();
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index a0b0aac6e5a..9b2f106a1f4 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -816,6 +816,7 @@ message PGroupCommitInsertResponse {
     optional int64 txn_id = 3;
     optional int64 loaded_rows = 4;
     optional int64 filtered_rows = 5;
+    optional string error_url = 6;
 }
 
 message POpenLoadStreamRequest {
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 5c60ff82bb8..371e392fcc3 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -69,6 +69,24 @@ suite("insert_group_commit_into") {
         return serverInfo
     }
 
+    def group_commit_insert_with_retry = { sql, expected_row_count ->
+        def retry = 0
+        while (true){
+            try {
+                return group_commit_insert(sql, expected_row_count)
+            } catch (Exception e) {
+                logger.warn("group_commit_insert failed, retry: " + retry + ", 
error: " + e.getMessage())
+                retry++
+                if (e.getMessage().contains("is blocked on schema change") && 
retry < 20) {
+                    sleep(1500)
+                    continue
+                } else {
+                    throw e
+                }
+            }
+        }
+    }
+
     def none_group_commit_insert = { sql, expected_row_count ->
         def stmt = prepareStatement """ ${sql}  """
         def result = stmt.executeUpdate()
@@ -186,10 +204,19 @@ suite("insert_group_commit_into") {
                 // 7. insert into and add rollup
                 group_commit_insert """ insert into ${table}(name, id) 
values('c', 3);  """, 1
                 group_commit_insert """ insert into ${table}(id) values(4);  
""", 1
+                sql "set enable_insert_strict=false"
                 group_commit_insert """ insert into ${table} values (1, 'a', 
10),(5, 'q', 50),(101, 'a', 100);  """, 2
-                // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
-                group_commit_insert """ insert into ${table}(id, name) 
values(2, 'b');  """, 1
-                group_commit_insert """ insert into ${table}(id) select 6; 
""", 1
+                sql "set enable_insert_strict=true"
+                try {
+                    sql """ insert into ${table} values (102, 'a', 100);  """
+                    assertTrue(false, "insert should fail")
+                } catch (Exception e) {
+                    logger.info("error: " + e.getMessage())
+                    assertTrue(e.getMessage().contains("url:"))
+                }
+                sql """ alter table ${table} ADD ROLLUP r1(name, score); """
+                group_commit_insert_with_retry """ insert into ${table}(id, 
name) values(2, 'b');  """, 1
+                group_commit_insert_with_retry """ insert into ${table}(id) 
select 6; """, 1
 
                 getRowCount(20)
                 qt_sql """ select name, score from ${table} order by name asc; 
"""
@@ -237,7 +264,7 @@ suite("insert_group_commit_into") {
 
                         // 1. insert into
                         def server_info = group_commit_insert """ insert into 
${table}(name, id) values('c', 3);  """, 1
-                        assertTrue(server_info.contains('query_id'))
+                        /*assertTrue(server_info.contains('query_id'))
                         // get query_id, such as 
43f87963586a482a-b0496bcf9e2b5555
                         def query_id_index = 
server_info.indexOf("'query_id':'") + "'query_id':'".length()
                         def query_id = server_info.substring(query_id_index, 
query_id_index + 33)
@@ -255,7 +282,7 @@ suite("insert_group_commit_into") {
                         logger.info("Get profile: code=" + code + ", out=" + 
out + ", err=" + err)
                         assertEquals(code, 0)
                         def json = parseJson(out)
-                        assertEquals("success", json.msg.toLowerCase())
+                        assertEquals("success", json.msg.toLowerCase())*/
                     }
                 }
             } else {
diff --git 
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
 
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
index 51264d3d863..5d6339d0894 100644
--- 
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
+++ 
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
@@ -216,7 +216,7 @@ suite("insert_group_commit_into_max_filter_ratio") {
 
             sql """ set group_commit = async_mode; """
             sql """ set enable_insert_strict = false; """
-            group_commit_insert """ insert into ${dbTableName} values (9, 'a', 
'a'); """, 0
+            group_commit_insert """ insert into ${dbTableName} values (9, 'a', 
'a'); """, 1
         }
         if (item == "nereids") {
             get_row_count_with_retry(6)
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index e12a1f2f01b..0f823f1d4ef 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") {
             time 10000 // limit inflight 10s
 
             check { result, exception, startTime, endTime ->
-                checkStreamLoadResult(exception, result, 6, 3, 2, 1)
+                checkStreamLoadResult(exception, result, 6, 2, 3, 1)
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to