This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 66fbb22ad74 [fix](group commit) Fix some wal problems on group commit 
(#28554)
66fbb22ad74 is described below

commit 66fbb22ad7447fa714da68f3c02598eada7a9cb4
Author: huanghaibin <[email protected]>
AuthorDate: Tue Dec 19 09:51:03 2023 +0800

    [fix](group commit) Fix some wal problems on group commit (#28554)
---
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/olap/wal_manager.cpp                        |  34 ++++--
 be/src/olap/wal_manager.h                          |   3 +
 be/src/olap/wal_table.cpp                          | 119 +++++++++++++--------
 be/src/runtime/group_commit_mgr.cpp                |   8 +-
 be/src/vec/exec/format/wal/wal_reader.cpp          |   4 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |   2 +-
 be/src/vec/sink/group_commit_block_sink.cpp        |  28 ++++-
 be/src/vec/sink/group_commit_block_sink.h          |   5 +
 .../apache/doris/service/FrontendServiceImpl.java  |   9 +-
 .../ExternalFileTableValuedFunction.java           |   9 +-
 .../test_group_commit_http_stream.groovy           |   4 +-
 .../test_group_commit_stream_load.groovy           |   4 +-
 14 files changed, 160 insertions(+), 71 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f286c33d547..2722b5a8a16 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1109,6 +1109,7 @@ DEFINE_Int16(bitmap_serialize_version, "1");
 DEFINE_String(group_commit_replay_wal_dir, "./wal");
 DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
 DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
+DEFINE_Int32(group_commit_relay_wal_threads, "10");
 
 // the count of thread to group commit insert
 DEFINE_Int32(group_commit_insert_threads, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4054b315aa4..37020c56be4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1182,6 +1182,7 @@ DECLARE_Int16(bitmap_serialize_version);
 DECLARE_String(group_commit_replay_wal_dir);
 DECLARE_Int32(group_commit_replay_wal_retry_num);
 DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
+DECLARE_mInt32(group_commit_relay_wal_threads);
 
 // This config can be set to limit thread number in group commit insert thread 
pool.
 DECLARE_mInt32(group_commit_insert_threads);
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index abb7d8a324b..f03075609ea 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -42,6 +42,10 @@ WalManager::WalManager(ExecEnv* exec_env, const std::string& 
wal_dir_list)
     doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
     _all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
     _cv = std::make_shared<std::condition_variable>();
+    static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
+                              .set_min_threads(1)
+                              
.set_max_threads(config::group_commit_relay_wal_threads)
+                              .build(&_thread_pool));
 }
 
 WalManager::~WalManager() {
@@ -56,6 +60,7 @@ void WalManager::stop() {
         if (_replay_thread) {
             _replay_thread->join();
         }
+        _thread_pool->shutdown();
         LOG(INFO) << "WalManager is stopped";
     }
 }
@@ -161,6 +166,10 @@ Status WalManager::add_wal_path(int64_t db_id, int64_t 
table_id, int64_t wal_id,
        << std::to_string(wal_id) << "_" << label;
     {
         std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+        auto it = _wal_path_map.find(wal_id);
+        if (it != _wal_path_map.end()) {
+            return Status::InternalError("wal_id {} already in wal_path_map", 
wal_id);
+        }
         _wal_path_map.emplace(wal_id, ss.str());
     }
     return Status::OK();
@@ -299,10 +308,12 @@ Status WalManager::replay() {
             }
         }
         for (const auto& table_id : replay_tables) {
-            auto st = _table_map[table_id]->replay_wals();
-            if (!st.ok()) {
-                LOG(WARNING) << "Failed add replay wal on table " << table_id;
-            }
+            RETURN_IF_ERROR(_thread_pool->submit_func([table_id, this] {
+                auto st = this->_table_map[table_id]->replay_wals();
+                if (!st.ok()) {
+                    LOG(WARNING) << "Failed add replay wal on table " << 
table_id;
+                }
+            }));
         }
     } while (!_stop_background_threads_latch.wait_for(
             
std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds)));
@@ -351,10 +362,13 @@ Status WalManager::delete_wal(int64_t wal_id) {
         if (_wal_id_to_writer_map.empty()) {
             CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0);
         }
-        std::string wal_path = _wal_path_map[wal_id];
-        RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
-        LOG(INFO) << "delete file=" << wal_path;
-        _wal_path_map.erase(wal_id);
+        auto it = _wal_path_map.find(wal_id);
+        if (it != _wal_path_map.end()) {
+            std::string wal_path = it->second;
+            
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
+            LOG(INFO) << "delete file=" << wal_path;
+            _wal_path_map.erase(wal_id);
+        }
     }
     return Status::OK();
 }
@@ -371,10 +385,13 @@ void WalManager::stop_relay_wal() {
 }
 
 void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>& 
column_index) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
     _wal_column_id_map.emplace(wal_id, column_index);
+    LOG(INFO) << "add " << wal_id << " to wal_column_id_map";
 }
 
 void WalManager::erase_wal_column_index(int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
     if (_wal_column_id_map.erase(wal_id)) {
         LOG(INFO) << "erase " << wal_id << " from wal_column_id_map";
     } else {
@@ -383,6 +400,7 @@ void WalManager::erase_wal_column_index(int64_t wal_id) {
 }
 
 Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>& 
column_index) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
     auto it = _wal_column_id_map.find(wal_id);
     if (it != _wal_column_id_map.end()) {
         column_index = it->second;
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index d0a547a8d6f..fdeadbee1d0 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -31,6 +31,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "util/thread.h"
+#include "util/threadpool.h"
 
 namespace doris {
 class WalManager {
@@ -86,7 +87,9 @@ private:
     std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
     std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>> 
_wal_status_queues;
     std::atomic<bool> _stop;
+    std::shared_mutex _wal_column_id_map_lock;
     std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
     std::shared_ptr<std::condition_variable> _cv;
+    std::unique_ptr<doris::ThreadPool> _thread_pool;
 };
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp
index 4bfcec502a8..54d158e7a95 100644
--- a/be/src/olap/wal_table.cpp
+++ b/be/src/olap/wal_table.cpp
@@ -48,6 +48,8 @@ WalTable::~WalTable() {}
 std::string k_request_line;
 #endif
 
+bool retry = false;
+
 void WalTable::add_wals(std::vector<std::string> wals) {
     std::lock_guard<std::mutex> lock(_replay_wal_lock);
     for (const auto& wal : wals) {
@@ -57,6 +59,7 @@ void WalTable::add_wals(std::vector<std::string> wals) {
 }
 Status WalTable::replay_wals() {
     std::vector<std::string> need_replay_wals;
+    std::vector<std::string> need_erase_wals;
     {
         std::lock_guard<std::mutex> lock(_replay_wal_lock);
         if (_replay_wal_map.empty()) {
@@ -76,7 +79,7 @@ Status WalTable::replay_wals() {
                 std::string rename_path = _get_tmp_path(wal);
                 LOG(INFO) << "rename wal from " << wal << " to " << 
rename_path;
                 std::rename(wal.c_str(), rename_path.c_str());
-                _replay_wal_map.erase(wal);
+                need_erase_wals.push_back(wal);
                 continue;
             }
             if (_need_replay(info)) {
@@ -84,6 +87,13 @@ Status WalTable::replay_wals() {
             }
         }
         std::sort(need_replay_wals.begin(), need_replay_wals.end());
+        for (const auto& wal : need_erase_wals) {
+            if (_replay_wal_map.erase(wal)) {
+                LOG(INFO) << "erase wal " << wal << " from _replay_wal_map";
+            } else {
+                LOG(WARNING) << "fail to erase wal " << wal << " from 
_replay_wal_map";
+            }
+        }
     }
     for (const auto& wal : need_replay_wals) {
         {
@@ -216,7 +226,49 @@ Status WalTable::_get_wal_info(const std::string& wal,
 }
 
 void http_request_done(struct evhttp_request* req, void* arg) {
-    event_base_loopbreak((struct event_base*)arg);
+    std::stringstream out;
+    std::string status;
+    std::string msg;
+    std::string wal_id;
+    size_t len = 0;
+    if (req != nullptr) {
+        auto input = evhttp_request_get_input_buffer(req);
+        char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
+        while (request_line != nullptr) {
+            std::string s(request_line);
+            out << request_line;
+            request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
+        }
+        auto out_str = out.str();
+        LOG(INFO) << "replay wal out_str:" << out_str;
+        rapidjson::Document doc;
+        if (!out_str.empty()) {
+            doc.Parse(out.str().c_str());
+            status = std::string(doc["Status"].GetString());
+            msg = std::string(doc["Message"].GetString());
+            LOG(INFO) << "replay wal status:" << status << ",msg:" << msg;
+            if (status.find("Fail") != status.npos) {
+                if (msg.find("Label") != msg.npos &&
+                    msg.find("has already been used") != msg.npos) {
+                    retry = false;
+                } else {
+                    retry = true;
+                }
+            } else {
+                retry = false;
+            }
+        } else {
+            retry = true;
+        }
+    } else {
+        LOG(WARNING) << "req is null";
+    }
+
+    if (arg != nullptr) {
+        event_base_loopbreak((struct event_base*)arg);
+    } else {
+        LOG(WARNING) << "arg is null";
+    }
 }
 
 Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const 
std::string& label) {
@@ -224,6 +276,7 @@ Status WalTable::_send_request(int64_t wal_id, const 
std::string& wal, const std
     struct event_base* base = nullptr;
     struct evhttp_connection* conn = nullptr;
     struct evhttp_request* req = nullptr;
+    retry = false;
     event_init();
     base = event_base_new();
     conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port);
@@ -239,18 +292,20 @@ Status WalTable::_send_request(int64_t wal_id, const 
std::string& wal, const std
     std::vector<size_t> index_vector;
     std::stringstream ss_name;
     std::stringstream ss_id;
-    int index = 0;
+    int index_raw = 0;
     for (auto column_id_str : column_id_element) {
         try {
             int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
             auto it = _column_id_name_map.find(column_id);
-            if (it != _column_id_name_map.end()) {
-                ss_name << it->second << ",";
+            auto it2 = _column_id_index_map.find(column_id);
+            if (it != _column_id_name_map.end() && it2 != 
_column_id_index_map.end()) {
+                ss_name << "`" << it->second << "`,";
                 ss_id << "c" << 
std::to_string(_column_id_index_map[column_id]) << ",";
-                index_vector.emplace_back(index);
+                index_vector.emplace_back(index_raw);
                 _column_id_name_map.erase(column_id);
+                _column_id_index_map.erase(column_id);
             }
-            index++;
+            index_raw++;
         } catch (const std::invalid_argument& e) {
             return Status::InvalidArgument("Invalid format, {}", e.what());
         }
@@ -273,44 +328,21 @@ Status WalTable::_send_request(int64_t wal_id, const 
std::string& wal, const std
     evhttp_connection_free(conn);
     event_base_free(base);
 
-#endif
-    bool retry = false;
-    std::string status;
-    std::string msg;
-    std::stringstream out;
-    rapidjson::Document doc;
-#ifndef BE_TEST
-    size_t len = 0;
-    auto input = evhttp_request_get_input_buffer(req);
-    char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-    while (request_line != nullptr) {
-        std::string s(request_line);
-        out << request_line;
-        request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-    }
 #else
+    std::stringstream out;
     out << k_request_line;
-#endif
     auto out_str = out.str();
-    if (!out_str.empty()) {
-        doc.Parse(out.str().c_str());
-        status = std::string(doc["Status"].GetString());
-        msg = std::string(doc["Message"].GetString());
-        LOG(INFO) << "replay wal " << wal_id << " status:" << status << 
",msg:" << msg;
-        if (status.find("Fail") != status.npos) {
-            if (msg.find("Label") != msg.npos && msg.find("has already been 
used") != msg.npos) {
-                retry = false;
-            } else {
-                retry = true;
-            }
-        } else {
-            retry = false;
-        }
-    } else {
+    rapidjson::Document doc;
+    doc.Parse(out_str.c_str());
+    auto status = std::string(doc["Status"].GetString());
+    if (status.find("Fail") != status.npos) {
         retry = true;
+    } else {
+        retry = false;
     }
+#endif
     if (retry) {
-        LOG(INFO) << "fail to replay wal =" << wal << ",status:" << status << 
",msg:" << msg;
+        LOG(INFO) << "fail to replay wal =" << wal;
         std::lock_guard<std::mutex> lock(_replay_wal_lock);
         auto it = _replay_wal_map.find(wal);
         if (it != _replay_wal_map.end()) {
@@ -320,7 +352,7 @@ Status WalTable::_send_request(int64_t wal_id, const 
std::string& wal, const std
             _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), 
false});
         }
     } else {
-        LOG(INFO) << "success to replay wal =" << wal << ",status:" << status 
<< ",msg:" << msg;
+        LOG(INFO) << "success to replay wal =" << wal;
         RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
         
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, 
wal_id));
         std::lock_guard<std::mutex> lock(_replay_wal_lock);
@@ -384,14 +416,17 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t 
tb_id) {
         std::string columns_str = result.column_info;
         std::vector<std::string> column_element;
         doris::vectorized::WalReader::string_split(columns_str, ",", 
column_element);
-        int64_t index = 1;
+        int64_t column_index = 1;
+        _column_id_name_map.clear();
+        _column_id_index_map.clear();
         for (auto column : column_element) {
             auto pos = column.find(":");
             try {
                 auto column_name = column.substr(0, pos);
                 int64_t column_id = std::strtoll(column.substr(pos + 
1).c_str(), NULL, 10);
                 _column_id_name_map.emplace(column_id, column_name);
-                _column_id_index_map.emplace(column_id, index++);
+                _column_id_index_map.emplace(column_id, column_index);
+                column_index++;
             } catch (const std::invalid_argument& e) {
                 return Status::InvalidArgument("Invalid format, {}", e.what());
             }
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index b97d5de8a54..16c841cc0f3 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -341,11 +341,15 @@ Status 
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
                     std::vector<std::string> {wal_path}));
             _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id,
                                                        
WalManager::WAL_STATUS::REPLAY);
+        } else {
+            RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id));
+            
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id));
         }
         return st;
     }
     // TODO handle execute and commit error
-    if (!prepare_failed && !result_status.ok()) {
+    if (!prepare_failed && !result_status.ok() &&
+        !(result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
         RETURN_IF_ERROR(_exec_env->wal_mgr()->add_wal_path(_db_id, table_id, 
txn_id, label));
         std::string wal_path;
         RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
@@ -458,7 +462,7 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t 
tb_id, int64_t wal_id,
                                   const std::string& import_label, WalManager* 
wal_manager,
                                   std::vector<TSlotDescriptor>& slot_desc, int 
be_exe_version) {
     _v_wal_writer = std::make_shared<vectorized::VWalWriter>(
-            db_id, tb_id, txn_id, label, wal_manager, slot_desc, 
be_exe_version);
+            db_id, tb_id, wal_id, import_label, wal_manager, slot_desc, 
be_exe_version);
     return _v_wal_writer->init();
 }
 
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp 
b/be/src/vec/exec/format/wal/wal_reader.cpp
index 035ce2cd82c..f0e5e29ca8e 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -58,7 +58,7 @@ Status WalReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
     for (auto column : columns) {
         auto pos = _column_index[index];
         vectorized::ColumnPtr column_ptr = 
src_block.get_by_position(pos).column;
-        if (column.column->is_nullable()) {
+        if (column_ptr != nullptr && column.column->is_nullable()) {
             column_ptr = make_nullable(column_ptr);
         }
         dst_block.insert(index, 
vectorized::ColumnWithTypeAndName(std::move(column_ptr),
@@ -67,7 +67,7 @@ Status WalReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
     }
     block->swap(dst_block);
     *read_rows = block->rows();
-    VLOG_DEBUG << "read block rows:" << *read_rows;
+    LOG(INFO) << "read block rows:" << *read_rows;
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a35555554b4..6002f4eea67 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -900,7 +900,7 @@ Status VFileScanner::_get_next_reader() {
 
         _name_to_col_type.clear();
         _missing_cols.clear();
-        static_cast<void>(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
+        RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, 
&_missing_cols));
         _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
         RETURN_IF_ERROR(_generate_fill_columns());
         if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index bb5c5c70d0c..6b78ad74ee0 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -30,7 +30,7 @@ namespace vectorized {
 
 GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
                                            const std::vector<TExpr>& texprs, 
Status* status)
-        : DataSink(row_desc) {
+        : DataSink(row_desc), _filter_bitmap(1024) {
     // From the thrift expressions create the real exprs.
     *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
     _name = "GroupCommitBlockSink";
@@ -50,6 +50,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
     _group_commit_mode = table_sink.group_commit_mode;
     _load_id = table_sink.load_id;
     _max_filter_ratio = table_sink.max_filter_ratio;
+    _vpartition = new doris::VOlapTablePartitionParam(_schema, 
table_sink.partition);
+    RETURN_IF_ERROR(_vpartition->init());
     return Status::OK();
 }
 
@@ -139,13 +141,35 @@ Status GroupCommitBlockSink::send(RuntimeState* state, 
vectorized::Block* input_
     bool has_filtered_rows = false;
     RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
             state, input_block, block, _output_vexpr_ctxs, rows, 
has_filtered_rows));
-    if (_block_convertor->num_filtered_rows() > 0) {
+    _has_filtered_rows = false;
+    if (!_vpartition->is_auto_partition()) {
+        //reuse vars for find_partition
+        _partitions.assign(rows, nullptr);
+        _filter_bitmap.Reset(rows);
+
+        for (int index = 0; index < rows; index++) {
+            _vpartition->find_partition(block.get(), index, 
_partitions[index]);
+        }
+        for (int row_index = 0; row_index < rows; row_index++) {
+            if (_partitions[row_index] == nullptr) [[unlikely]] {
+                _filter_bitmap.Set(row_index, true);
+                LOG(WARNING) << "no partition for this tuple. tuple="
+                             << block->dump_data(row_index, 1);
+            }
+            _has_filtered_rows = true;
+        }
+    }
+
+    if (_block_convertor->num_filtered_rows() > 0 || _has_filtered_rows) {
         auto cloneBlock = block->clone_without_columns();
         auto res_block = 
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
         for (int i = 0; i < rows; ++i) {
             if (_block_convertor->filter_map()[i]) {
                 continue;
             }
+            if (_filter_bitmap.Get(i)) {
+                continue;
+            }
             res_block.add_row(block.get(), i);
         }
         block->swap(res_block.to_block());
diff --git a/be/src/vec/sink/group_commit_block_sink.h 
b/be/src/vec/sink/group_commit_block_sink.h
index 2ae37be368a..9adb7f38bcf 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -70,6 +70,11 @@ private:
     std::vector<std::shared_ptr<vectorized::Block>> _blocks;
     bool _is_block_appended = false;
     double _max_filter_ratio = 0.0;
+    VOlapTablePartitionParam* _vpartition = nullptr;
+    // reuse for find_tablet.
+    std::vector<VOlapTablePartition*> _partitions;
+    Bitmap _filter_bitmap;
+    bool _has_filtered_rows = false;
 };
 
 } // namespace vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4ac1abf7f15..4f88d22c836 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -240,6 +240,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -3296,15 +3297,17 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return result;
         }
 
-        Table table = db.getTable(tableId).get();
-        if (table == null) {
+        Table table;
+        try {
+            table = db.getTable(tableId).get();
+        } catch (NoSuchElementException e) {
             errorStatus.setErrorMsgs(
                     (Lists.newArrayList(String.format("dbId=%d tableId=%d is 
not exists", dbId, tableId))));
             result.setStatus(errorStatus);
             return result;
         }
         StringBuilder sb = new StringBuilder();
-        for (Column column : table.getFullSchema()) {
+        for (Column column : table.getBaseSchema(true)) {
             sb.append(column.getName() + ":" + column.getUniqueId() + ",");
         }
         String columnInfo = sb.toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 72a0f061c00..5fe89a07f13 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -25,7 +25,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.MapType;
-import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.StructField;
@@ -330,13 +329,9 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         if (this.fileFormatType == TFileFormatType.FORMAT_WAL) {
             List<Column> fileColumns = new ArrayList<>();
             Table table = 
Env.getCurrentInternalCatalog().getTableByTableId(tableId);
-            List<Column> tableColumns = table.getBaseSchema(false);
+            List<Column> tableColumns = table.getBaseSchema(true);
             for (int i = 1; i <= tableColumns.size(); i++) {
-                fileColumns.add(new Column("c" + i, tableColumns.get(i - 
1).getDataType(), true));
-            }
-            Column deleteSignColumn = ((OlapTable) 
table).getDeleteSignColumn();
-            if (deleteSignColumn != null) {
-                fileColumns.add(new Column("c" + (tableColumns.size() + 1), 
deleteSignColumn.getDataType(), true));
+                fileColumns.add(new Column("c" + i, tableColumns.get(i - 
1).getType(), true));
             }
             return fileColumns;
         }
diff --git 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index 6909a919c67..ed3b1fc832f 100644
--- 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++ 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -59,8 +59,8 @@ suite("test_group_commit_http_stream") {
         assertTrue(json.GroupCommit)
         assertTrue(json.Label.startsWith("group_commit_"))
         assertEquals(total_rows, json.NumberTotalRows)
-        //assertEquals(loaded_rows, json.NumberLoadedRows)
-        //assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
         assertEquals(unselected_rows, json.NumberUnselectedRows)
         if (filtered_rows > 0) {
             assertFalse(json.ErrorURL.isEmpty())
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index b60b6dc5555..e12a1f2f01b 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -58,8 +58,8 @@ suite("test_group_commit_stream_load") {
         assertTrue(json.GroupCommit)
         assertTrue(json.Label.startsWith("group_commit_"))
         assertEquals(total_rows, json.NumberTotalRows)
-        //assertEquals(loaded_rows, json.NumberLoadedRows)
-        //assertEquals(filtered_rows, json.NumberFilteredRows)
+        assertEquals(loaded_rows, json.NumberLoadedRows)
+        assertEquals(filtered_rows, json.NumberFilteredRows)
         assertEquals(unselected_rows, json.NumberUnselectedRows)
         if (filtered_rows > 0) {
             assertFalse(json.ErrorURL.isEmpty())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to