This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 66fbb22ad74 [fix](group commit) Fix some wal problems on group commit
(#28554)
66fbb22ad74 is described below
commit 66fbb22ad7447fa714da68f3c02598eada7a9cb4
Author: huanghaibin <[email protected]>
AuthorDate: Tue Dec 19 09:51:03 2023 +0800
[fix](group commit) Fix some wal problems on group commit (#28554)
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/olap/wal_manager.cpp | 34 ++++--
be/src/olap/wal_manager.h | 3 +
be/src/olap/wal_table.cpp | 119 +++++++++++++--------
be/src/runtime/group_commit_mgr.cpp | 8 +-
be/src/vec/exec/format/wal/wal_reader.cpp | 4 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 2 +-
be/src/vec/sink/group_commit_block_sink.cpp | 28 ++++-
be/src/vec/sink/group_commit_block_sink.h | 5 +
.../apache/doris/service/FrontendServiceImpl.java | 9 +-
.../ExternalFileTableValuedFunction.java | 9 +-
.../test_group_commit_http_stream.groovy | 4 +-
.../test_group_commit_stream_load.groovy | 4 +-
14 files changed, 160 insertions(+), 71 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f286c33d547..2722b5a8a16 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1109,6 +1109,7 @@ DEFINE_Int16(bitmap_serialize_version, "1");
DEFINE_String(group_commit_replay_wal_dir, "./wal");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
+DEFINE_Int32(group_commit_relay_wal_threads, "10");
// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4054b315aa4..37020c56be4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1182,6 +1182,7 @@ DECLARE_Int16(bitmap_serialize_version);
DECLARE_String(group_commit_replay_wal_dir);
DECLARE_Int32(group_commit_replay_wal_retry_num);
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
+DECLARE_mInt32(group_commit_relay_wal_threads);
// This config can be set to limit thread number in group commit insert thread
pool.
DECLARE_mInt32(group_commit_insert_threads);
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index abb7d8a324b..f03075609ea 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -42,6 +42,10 @@ WalManager::WalManager(ExecEnv* exec_env, const std::string&
wal_dir_list)
doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
_all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
_cv = std::make_shared<std::condition_variable>();
+ static_cast<void>(ThreadPoolBuilder("GroupCommitReplayWalThreadPool")
+ .set_min_threads(1)
+
.set_max_threads(config::group_commit_relay_wal_threads)
+ .build(&_thread_pool));
}
WalManager::~WalManager() {
@@ -56,6 +60,7 @@ void WalManager::stop() {
if (_replay_thread) {
_replay_thread->join();
}
+ _thread_pool->shutdown();
LOG(INFO) << "WalManager is stopped";
}
}
@@ -161,6 +166,10 @@ Status WalManager::add_wal_path(int64_t db_id, int64_t
table_id, int64_t wal_id,
<< std::to_string(wal_id) << "_" << label;
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
+ auto it = _wal_path_map.find(wal_id);
+ if (it != _wal_path_map.end()) {
+ return Status::InternalError("wal_id {} already in wal_path_map",
wal_id);
+ }
_wal_path_map.emplace(wal_id, ss.str());
}
return Status::OK();
@@ -299,10 +308,12 @@ Status WalManager::replay() {
}
}
for (const auto& table_id : replay_tables) {
- auto st = _table_map[table_id]->replay_wals();
- if (!st.ok()) {
- LOG(WARNING) << "Failed add replay wal on table " << table_id;
- }
+ RETURN_IF_ERROR(_thread_pool->submit_func([table_id, this] {
+ auto st = this->_table_map[table_id]->replay_wals();
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed add replay wal on table " <<
table_id;
+ }
+ }));
}
} while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::group_commit_replay_wal_retry_interval_seconds)));
@@ -351,10 +362,13 @@ Status WalManager::delete_wal(int64_t wal_id) {
if (_wal_id_to_writer_map.empty()) {
CHECK_EQ(_all_wal_disk_bytes->load(std::memory_order_relaxed), 0);
}
- std::string wal_path = _wal_path_map[wal_id];
- RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
- LOG(INFO) << "delete file=" << wal_path;
- _wal_path_map.erase(wal_id);
+ auto it = _wal_path_map.find(wal_id);
+ if (it != _wal_path_map.end()) {
+ std::string wal_path = it->second;
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(wal_path));
+ LOG(INFO) << "delete file=" << wal_path;
+ _wal_path_map.erase(wal_id);
+ }
}
return Status::OK();
}
@@ -371,10 +385,13 @@ void WalManager::stop_relay_wal() {
}
void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
_wal_column_id_map.emplace(wal_id, column_index);
+ LOG(INFO) << "add " << wal_id << " to wal_column_id_map";
}
void WalManager::erase_wal_column_index(int64_t wal_id) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
if (_wal_column_id_map.erase(wal_id)) {
LOG(INFO) << "erase " << wal_id << " from wal_column_id_map";
} else {
@@ -383,6 +400,7 @@ void WalManager::erase_wal_column_index(int64_t wal_id) {
}
Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_column_id_map_lock);
auto it = _wal_column_id_map.find(wal_id);
if (it != _wal_column_id_map.end()) {
column_index = it->second;
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index d0a547a8d6f..fdeadbee1d0 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -31,6 +31,7 @@
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/thread.h"
+#include "util/threadpool.h"
namespace doris {
class WalManager {
@@ -86,7 +87,9 @@ private:
std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>>
_wal_status_queues;
std::atomic<bool> _stop;
+ std::shared_mutex _wal_column_id_map_lock;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
std::shared_ptr<std::condition_variable> _cv;
+ std::unique_ptr<doris::ThreadPool> _thread_pool;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp
index 4bfcec502a8..54d158e7a95 100644
--- a/be/src/olap/wal_table.cpp
+++ b/be/src/olap/wal_table.cpp
@@ -48,6 +48,8 @@ WalTable::~WalTable() {}
std::string k_request_line;
#endif
+bool retry = false;
+
void WalTable::add_wals(std::vector<std::string> wals) {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
for (const auto& wal : wals) {
@@ -57,6 +59,7 @@ void WalTable::add_wals(std::vector<std::string> wals) {
}
Status WalTable::replay_wals() {
std::vector<std::string> need_replay_wals;
+ std::vector<std::string> need_erase_wals;
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (_replay_wal_map.empty()) {
@@ -76,7 +79,7 @@ Status WalTable::replay_wals() {
std::string rename_path = _get_tmp_path(wal);
LOG(INFO) << "rename wal from " << wal << " to " <<
rename_path;
std::rename(wal.c_str(), rename_path.c_str());
- _replay_wal_map.erase(wal);
+ need_erase_wals.push_back(wal);
continue;
}
if (_need_replay(info)) {
@@ -84,6 +87,13 @@ Status WalTable::replay_wals() {
}
}
std::sort(need_replay_wals.begin(), need_replay_wals.end());
+ for (const auto& wal : need_erase_wals) {
+ if (_replay_wal_map.erase(wal)) {
+ LOG(INFO) << "erase wal " << wal << " from _replay_wal_map";
+ } else {
+ LOG(WARNING) << "fail to erase wal " << wal << " from
_replay_wal_map";
+ }
+ }
}
for (const auto& wal : need_replay_wals) {
{
@@ -216,7 +226,49 @@ Status WalTable::_get_wal_info(const std::string& wal,
}
void http_request_done(struct evhttp_request* req, void* arg) {
- event_base_loopbreak((struct event_base*)arg);
+ std::stringstream out;
+ std::string status;
+ std::string msg;
+ std::string wal_id;
+ size_t len = 0;
+ if (req != nullptr) {
+ auto input = evhttp_request_get_input_buffer(req);
+ char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
+ while (request_line != nullptr) {
+ std::string s(request_line);
+ out << request_line;
+ request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
+ }
+ auto out_str = out.str();
+ LOG(INFO) << "replay wal out_str:" << out_str;
+ rapidjson::Document doc;
+ if (!out_str.empty()) {
+ doc.Parse(out.str().c_str());
+ status = std::string(doc["Status"].GetString());
+ msg = std::string(doc["Message"].GetString());
+ LOG(INFO) << "replay wal status:" << status << ",msg:" << msg;
+ if (status.find("Fail") != status.npos) {
+ if (msg.find("Label") != msg.npos &&
+ msg.find("has already been used") != msg.npos) {
+ retry = false;
+ } else {
+ retry = true;
+ }
+ } else {
+ retry = false;
+ }
+ } else {
+ retry = true;
+ }
+ } else {
+ LOG(WARNING) << "req is null";
+ }
+
+ if (arg != nullptr) {
+ event_base_loopbreak((struct event_base*)arg);
+ } else {
+ LOG(WARNING) << "arg is null";
+ }
}
Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const
std::string& label) {
@@ -224,6 +276,7 @@ Status WalTable::_send_request(int64_t wal_id, const
std::string& wal, const std
struct event_base* base = nullptr;
struct evhttp_connection* conn = nullptr;
struct evhttp_request* req = nullptr;
+ retry = false;
event_init();
base = event_base_new();
conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port);
@@ -239,18 +292,20 @@ Status WalTable::_send_request(int64_t wal_id, const
std::string& wal, const std
std::vector<size_t> index_vector;
std::stringstream ss_name;
std::stringstream ss_id;
- int index = 0;
+ int index_raw = 0;
for (auto column_id_str : column_id_element) {
try {
int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
auto it = _column_id_name_map.find(column_id);
- if (it != _column_id_name_map.end()) {
- ss_name << it->second << ",";
+ auto it2 = _column_id_index_map.find(column_id);
+ if (it != _column_id_name_map.end() && it2 !=
_column_id_index_map.end()) {
+ ss_name << "`" << it->second << "`,";
ss_id << "c" <<
std::to_string(_column_id_index_map[column_id]) << ",";
- index_vector.emplace_back(index);
+ index_vector.emplace_back(index_raw);
_column_id_name_map.erase(column_id);
+ _column_id_index_map.erase(column_id);
}
- index++;
+ index_raw++;
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
@@ -273,44 +328,21 @@ Status WalTable::_send_request(int64_t wal_id, const
std::string& wal, const std
evhttp_connection_free(conn);
event_base_free(base);
-#endif
- bool retry = false;
- std::string status;
- std::string msg;
- std::stringstream out;
- rapidjson::Document doc;
-#ifndef BE_TEST
- size_t len = 0;
- auto input = evhttp_request_get_input_buffer(req);
- char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
- while (request_line != nullptr) {
- std::string s(request_line);
- out << request_line;
- request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
- }
#else
+ std::stringstream out;
out << k_request_line;
-#endif
auto out_str = out.str();
- if (!out_str.empty()) {
- doc.Parse(out.str().c_str());
- status = std::string(doc["Status"].GetString());
- msg = std::string(doc["Message"].GetString());
- LOG(INFO) << "replay wal " << wal_id << " status:" << status <<
",msg:" << msg;
- if (status.find("Fail") != status.npos) {
- if (msg.find("Label") != msg.npos && msg.find("has already been
used") != msg.npos) {
- retry = false;
- } else {
- retry = true;
- }
- } else {
- retry = false;
- }
- } else {
+ rapidjson::Document doc;
+ doc.Parse(out_str.c_str());
+ auto status = std::string(doc["Status"].GetString());
+ if (status.find("Fail") != status.npos) {
retry = true;
+ } else {
+ retry = false;
}
+#endif
if (retry) {
- LOG(INFO) << "fail to replay wal =" << wal << ",status:" << status <<
",msg:" << msg;
+ LOG(INFO) << "fail to replay wal =" << wal;
std::lock_guard<std::mutex> lock(_replay_wal_lock);
auto it = _replay_wal_map.find(wal);
if (it != _replay_wal_map.end()) {
@@ -320,7 +352,7 @@ Status WalTable::_send_request(int64_t wal_id, const
std::string& wal, const std
_replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(),
false});
}
} else {
- LOG(INFO) << "success to replay wal =" << wal << ",status:" << status
<< ",msg:" << msg;
+ LOG(INFO) << "success to replay wal =" << wal;
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id,
wal_id));
std::lock_guard<std::mutex> lock(_replay_wal_lock);
@@ -384,14 +416,17 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t
tb_id) {
std::string columns_str = result.column_info;
std::vector<std::string> column_element;
doris::vectorized::WalReader::string_split(columns_str, ",",
column_element);
- int64_t index = 1;
+ int64_t column_index = 1;
+ _column_id_name_map.clear();
+ _column_id_index_map.clear();
for (auto column : column_element) {
auto pos = column.find(":");
try {
auto column_name = column.substr(0, pos);
int64_t column_id = std::strtoll(column.substr(pos +
1).c_str(), NULL, 10);
_column_id_name_map.emplace(column_id, column_name);
- _column_id_index_map.emplace(column_id, index++);
+ _column_id_index_map.emplace(column_id, column_index);
+ column_index++;
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index b97d5de8a54..16c841cc0f3 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -341,11 +341,15 @@ Status
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
std::vector<std::string> {wal_path}));
_exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id,
WalManager::WAL_STATUS::REPLAY);
+ } else {
+ RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id));
+
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id));
}
return st;
}
// TODO handle execute and commit error
- if (!prepare_failed && !result_status.ok()) {
+ if (!prepare_failed && !result_status.ok() &&
+ !(result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
RETURN_IF_ERROR(_exec_env->wal_mgr()->add_wal_path(_db_id, table_id,
txn_id, label));
std::string wal_path;
RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
@@ -458,7 +462,7 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t
tb_id, int64_t wal_id,
const std::string& import_label, WalManager*
wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int
be_exe_version) {
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
- db_id, tb_id, txn_id, label, wal_manager, slot_desc,
be_exe_version);
+ db_id, tb_id, wal_id, import_label, wal_manager, slot_desc,
be_exe_version);
return _v_wal_writer->init();
}
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp
b/be/src/vec/exec/format/wal/wal_reader.cpp
index 035ce2cd82c..f0e5e29ca8e 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -58,7 +58,7 @@ Status WalReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
for (auto column : columns) {
auto pos = _column_index[index];
vectorized::ColumnPtr column_ptr =
src_block.get_by_position(pos).column;
- if (column.column->is_nullable()) {
+ if (column_ptr != nullptr && column.column->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
dst_block.insert(index,
vectorized::ColumnWithTypeAndName(std::move(column_ptr),
@@ -67,7 +67,7 @@ Status WalReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
}
block->swap(dst_block);
*read_rows = block->rows();
- VLOG_DEBUG << "read block rows:" << *read_rows;
+ LOG(INFO) << "read block rows:" << *read_rows;
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a35555554b4..6002f4eea67 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -900,7 +900,7 @@ Status VFileScanner::_get_next_reader() {
_name_to_col_type.clear();
_missing_cols.clear();
- static_cast<void>(_cur_reader->get_columns(&_name_to_col_type,
&_missing_cols));
+ RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type,
&_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index bb5c5c70d0c..6b78ad74ee0 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -30,7 +30,7 @@ namespace vectorized {
GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const
RowDescriptor& row_desc,
const std::vector<TExpr>& texprs,
Status* status)
- : DataSink(row_desc) {
+ : DataSink(row_desc), _filter_bitmap(1024) {
// From the thrift expressions create the real exprs.
*status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
_name = "GroupCommitBlockSink";
@@ -50,6 +50,8 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
_group_commit_mode = table_sink.group_commit_mode;
_load_id = table_sink.load_id;
_max_filter_ratio = table_sink.max_filter_ratio;
+ _vpartition = new doris::VOlapTablePartitionParam(_schema,
table_sink.partition);
+ RETURN_IF_ERROR(_vpartition->init());
return Status::OK();
}
@@ -139,13 +141,35 @@ Status GroupCommitBlockSink::send(RuntimeState* state,
vectorized::Block* input_
bool has_filtered_rows = false;
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
state, input_block, block, _output_vexpr_ctxs, rows,
has_filtered_rows));
- if (_block_convertor->num_filtered_rows() > 0) {
+ _has_filtered_rows = false;
+ if (!_vpartition->is_auto_partition()) {
+ //reuse vars for find_partition
+ _partitions.assign(rows, nullptr);
+ _filter_bitmap.Reset(rows);
+
+ for (int index = 0; index < rows; index++) {
+ _vpartition->find_partition(block.get(), index,
_partitions[index]);
+ }
+ for (int row_index = 0; row_index < rows; row_index++) {
+ if (_partitions[row_index] == nullptr) [[unlikely]] {
+ _filter_bitmap.Set(row_index, true);
+ LOG(WARNING) << "no partition for this tuple. tuple="
+ << block->dump_data(row_index, 1);
+ }
+ _has_filtered_rows = true;
+ }
+ }
+
+ if (_block_convertor->num_filtered_rows() > 0 || _has_filtered_rows) {
auto cloneBlock = block->clone_without_columns();
auto res_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < rows; ++i) {
if (_block_convertor->filter_map()[i]) {
continue;
}
+ if (_filter_bitmap.Get(i)) {
+ continue;
+ }
res_block.add_row(block.get(), i);
}
block->swap(res_block.to_block());
diff --git a/be/src/vec/sink/group_commit_block_sink.h
b/be/src/vec/sink/group_commit_block_sink.h
index 2ae37be368a..9adb7f38bcf 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -70,6 +70,11 @@ private:
std::vector<std::shared_ptr<vectorized::Block>> _blocks;
bool _is_block_appended = false;
double _max_filter_ratio = 0.0;
+ VOlapTablePartitionParam* _vpartition = nullptr;
+ // reuse for find_tablet.
+ std::vector<VOlapTablePartition*> _partitions;
+ Bitmap _filter_bitmap;
+ bool _has_filtered_rows = false;
};
} // namespace vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4ac1abf7f15..4f88d22c836 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -240,6 +240,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -3296,15 +3297,17 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
- Table table = db.getTable(tableId).get();
- if (table == null) {
+ Table table;
+ try {
+ table = db.getTable(tableId).get();
+ } catch (NoSuchElementException e) {
errorStatus.setErrorMsgs(
(Lists.newArrayList(String.format("dbId=%d tableId=%d is
not exists", dbId, tableId))));
result.setStatus(errorStatus);
return result;
}
StringBuilder sb = new StringBuilder();
- for (Column column : table.getFullSchema()) {
+ for (Column column : table.getBaseSchema(true)) {
sb.append(column.getName() + ":" + column.getUniqueId() + ",");
}
String columnInfo = sb.toString();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 72a0f061c00..5fe89a07f13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -25,7 +25,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.MapType;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
@@ -330,13 +329,9 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
if (this.fileFormatType == TFileFormatType.FORMAT_WAL) {
List<Column> fileColumns = new ArrayList<>();
Table table =
Env.getCurrentInternalCatalog().getTableByTableId(tableId);
- List<Column> tableColumns = table.getBaseSchema(false);
+ List<Column> tableColumns = table.getBaseSchema(true);
for (int i = 1; i <= tableColumns.size(); i++) {
- fileColumns.add(new Column("c" + i, tableColumns.get(i -
1).getDataType(), true));
- }
- Column deleteSignColumn = ((OlapTable)
table).getDeleteSignColumn();
- if (deleteSignColumn != null) {
- fileColumns.add(new Column("c" + (tableColumns.size() + 1),
deleteSignColumn.getDataType(), true));
+ fileColumns.add(new Column("c" + i, tableColumns.get(i -
1).getType(), true));
}
return fileColumns;
}
diff --git
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index 6909a919c67..ed3b1fc832f 100644
---
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -59,8 +59,8 @@ suite("test_group_commit_http_stream") {
assertTrue(json.GroupCommit)
assertTrue(json.Label.startsWith("group_commit_"))
assertEquals(total_rows, json.NumberTotalRows)
- //assertEquals(loaded_rows, json.NumberLoadedRows)
- //assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
assertEquals(unselected_rows, json.NumberUnselectedRows)
if (filtered_rows > 0) {
assertFalse(json.ErrorURL.isEmpty())
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index b60b6dc5555..e12a1f2f01b 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -58,8 +58,8 @@ suite("test_group_commit_stream_load") {
assertTrue(json.GroupCommit)
assertTrue(json.Label.startsWith("group_commit_"))
assertEquals(total_rows, json.NumberTotalRows)
- //assertEquals(loaded_rows, json.NumberLoadedRows)
- //assertEquals(filtered_rows, json.NumberFilteredRows)
+ assertEquals(loaded_rows, json.NumberLoadedRows)
+ assertEquals(filtered_rows, json.NumberFilteredRows)
assertEquals(unselected_rows, json.NumberUnselectedRows)
if (filtered_rows > 0) {
assertFalse(json.ErrorURL.isEmpty())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]