This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4599c4e474c [fix](group commit) Fix compatibility issues on
serializing and deserializing wal file (#32299)
4599c4e474c is described below
commit 4599c4e474cb89ccbffc522c5141f32d69539231
Author: huanghaibin <[email protected]>
AuthorDate: Tue Mar 19 22:04:45 2024 +0800
[fix](group commit) Fix compatibility issues on serializing and
deserializing wal file (#32299)
---
be/src/olap/wal/wal_manager.cpp | 5 +--
be/src/olap/wal/wal_manager.h | 7 ++--
be/src/olap/wal/wal_reader.cpp | 4 +--
be/src/olap/wal/wal_reader.h | 3 +-
be/src/olap/wal/wal_table.cpp | 5 ++-
be/src/olap/wal/wal_writer.cpp | 4 +--
be/src/olap/wal/wal_writer.h | 2 +-
be/src/runtime/group_commit_mgr.cpp | 20 +++++++----
be/src/vec/exec/format/wal/wal_reader.cpp | 11 +++++-
be/src/vec/exec/format/wal/wal_reader.h | 1 +
be/src/vec/sink/writer/vwal_writer.cpp | 2 +-
be/src/vec/sink/writer/vwal_writer.h | 2 --
be/test/exec/test_data/wal_scanner/wal | Bin 180 -> 0 bytes
be/test/exec/test_data/wal_scanner/wal_version0 | Bin 0 -> 220 bytes
be/test/exec/test_data/wal_scanner/wal_version1 | Bin 0 -> 272 bytes
be/test/vec/exec/vwal_scanner_test.cpp | 43 +++++++++++++++++++-----
16 files changed, 78 insertions(+), 31 deletions(-)
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 10a979f89dd..cce5d14d69e 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -205,11 +205,12 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) {
}
Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t
wal_id,
- const std::string& label, std::string&
base_path) {
+ const std::string& label, std::string&
base_path,
+ uint32_t wal_version) {
base_path = _wal_dirs_info->get_available_random_wal_dir();
std::stringstream ss;
ss << base_path << "/" << std::to_string(db_id) << "/" <<
std::to_string(table_id) << "/"
- << _wal_version << "_" << _exec_env->master_info()->backend_id << "_"
+ << std::to_string(wal_version) << "_" <<
_exec_env->master_info()->backend_id << "_"
<< std::to_string(wal_id) << "_" << label;
{
std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h
index 3710b9e31ca..9cc9f95bd79 100644
--- a/be/src/olap/wal/wal_manager.h
+++ b/be/src/olap/wal/wal_manager.h
@@ -73,7 +73,7 @@ public:
// replay wal
Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
- const std::string& label, std::string& base_path);
+ const std::string& label, std::string& base_path,
uint32_t wal_version);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status delete_wal(int64_t table_id, int64_t wal_id);
Status rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t
wal_id);
@@ -144,7 +144,6 @@ private:
std::shared_mutex _wal_queue_lock;
std::unordered_map<int64_t, std::set<int64_t>> _wal_queues;
- int64_t _wal_version = 0;
std::atomic<bool> _first_replay;
// for test relay
@@ -154,4 +153,8 @@ private:
std::shared_mutex _wal_cv_lock;
std::unordered_map<int64_t, WalCvInfo> _wal_cv_map;
};
+
+// In doris 2.1.0, wal version is 0, now need to upgrade it to 1 to solve
compatibility issues.
+// see https://github.com/apache/doris/pull/32299
+constexpr inline uint32_t WAL_VERSION = 1;
} // namespace doris
diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp
index d1263daf1c5..9e4618b2bc1 100644
--- a/be/src/olap/wal/wal_reader.cpp
+++ b/be/src/olap/wal/wal_reader.cpp
@@ -85,7 +85,7 @@ Status WalReader::read_block(PBlock& block) {
return Status::OK();
}
-Status WalReader::read_header(std::string& col_ids) {
+Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
if (file_reader->size() == 0) {
return Status::DataQualityError("empty file");
}
@@ -101,7 +101,7 @@ Status WalReader::read_header(std::string& col_ids) {
RETURN_IF_ERROR(
file_reader->read_at(_offset, {version_buf,
WalWriter::VERSION_SIZE}, &bytes_read));
_offset += WalWriter::VERSION_SIZE;
- _version = decode_fixed32_le(version_buf);
+ version = decode_fixed32_le(version_buf);
uint8_t len_buf[WalWriter::LENGTH_SIZE];
RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf,
WalWriter::LENGTH_SIZE}, &bytes_read));
_offset += WalWriter::LENGTH_SIZE;
diff --git a/be/src/olap/wal/wal_reader.h b/be/src/olap/wal/wal_reader.h
index 1f26a7598f0..c47d029331a 100644
--- a/be/src/olap/wal/wal_reader.h
+++ b/be/src/olap/wal/wal_reader.h
@@ -32,13 +32,12 @@ public:
Status finalize();
Status read_block(PBlock& block);
- Status read_header(std::string& col_ids);
+ Status read_header(uint32_t& version, std::string& col_ids);
private:
Status _check_checksum(const char* binary, size_t size, uint32_t checksum);
std::string _file_name;
- uint32_t _version = 0;
size_t _offset;
io::FileReaderSPtr file_reader;
};
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 14e3779748c..641ef8c6647 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -321,7 +321,10 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t
tb_id,
Status WalTable::_read_wal_header(const std::string& wal_path, std::string&
columns) {
std::shared_ptr<doris::WalReader> wal_reader =
std::make_shared<WalReader>(wal_path);
RETURN_IF_ERROR(wal_reader->init());
- RETURN_IF_ERROR(wal_reader->read_header(columns));
+ uint32_t version = 0;
+ RETURN_IF_ERROR(wal_reader->read_header(version, columns));
+ VLOG_DEBUG << "wal=" << wal_path << ",version=" << std::to_string(version)
+ << ",columns=" << columns;
RETURN_IF_ERROR(wal_reader->finalize());
return Status::OK();
}
diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp
index 43f021c9d31..5658eded564 100644
--- a/be/src/olap/wal/wal_writer.cpp
+++ b/be/src/olap/wal/wal_writer.cpp
@@ -90,7 +90,7 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
return Status::OK();
}
-Status WalWriter::append_header(uint32_t version, std::string col_ids) {
+Status WalWriter::append_header(std::string col_ids) {
if (!_file_writer) {
return Status::InternalError("wal writer is null,fail to write
file={}", _file_name);
}
@@ -105,7 +105,7 @@ Status WalWriter::append_header(uint32_t version,
std::string col_ids) {
offset += k_wal_magic_length;
uint8_t version_buf[sizeof(uint32_t)];
- encode_fixed32_le(version_buf, version);
+ encode_fixed32_le(version_buf, WAL_VERSION);
RETURN_IF_ERROR(_file_writer->append({version_buf, sizeof(uint32_t)}));
offset += VERSION_SIZE;
uint8_t len_buf[sizeof(uint64_t)];
diff --git a/be/src/olap/wal/wal_writer.h b/be/src/olap/wal/wal_writer.h
index 08d2a4eb710..f730e026660 100644
--- a/be/src/olap/wal/wal_writer.h
+++ b/be/src/olap/wal/wal_writer.h
@@ -36,7 +36,7 @@ public:
Status finalize();
Status append_blocks(const PBlockArray& blocks);
- Status append_header(uint32_t version, std::string col_ids);
+ Status append_header(std::string col_ids);
std::string file_name() { return _file_name; };
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index cadff231761..646b58c5fdc 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -314,8 +314,12 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version) {
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline,
params,
pipeline_params);
if (!st.ok()) {
- static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label,
txn_id, instance_id,
- st, nullptr));
+ auto finish_st = _finish_group_commit_load(_db_id, _table_id, label,
txn_id, instance_id,
+ st, nullptr);
+ if (!finish_st.ok()) {
+ LOG(WARNING) << "finish group commit error, label=" << label
+ << ", st=" << finish_st.to_string();
+ }
}
return st;
}
@@ -432,8 +436,12 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t
db_id, int64_t table_id,
const TExecPlanFragmentParams&
params,
const TPipelineFragmentParams&
pipeline_params) {
auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState*
state, Status* status) {
- static_cast<void>(_finish_group_commit_load(db_id, table_id, label,
txn_id,
-
state->fragment_instance_id(), *status, state));
+ auto finish_st = _finish_group_commit_load(db_id, table_id, label,
txn_id,
+
state->fragment_instance_id(), *status, state);
+ if (!finish_st.ok()) {
+ LOG(WARNING) << "finish group commit error, label=" << label
+ << ", st=" << finish_st.to_string();
+ }
};
if (is_pipeline) {
return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params,
finish_cb);
@@ -512,8 +520,8 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t
tb_id, int64_t wal_id,
std::string real_label = config::group_commit_wait_replay_wal_finish
? import_label + "_test_wait"
: import_label;
- RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(db_id,
tb_id, wal_id,
-
real_label, _wal_base_path));
+ RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(
+ db_id, tb_id, wal_id, real_label, _wal_base_path, WAL_VERSION));
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
db_id, tb_id, wal_id, real_label, wal_manager, slot_desc,
be_exe_version);
return _v_wal_writer->init();
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp
b/be/src/vec/exec/format/wal/wal_reader.cpp
index 981e740d05c..1fbd395ad26 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -47,6 +47,15 @@ Status WalReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
//read src block
PBlock pblock;
auto st = _wal_reader->read_block(pblock);
+ // Due to historical reasons, be_exec_version=3 will use the new way to
serialize block
+ // in doris 2.1.0, now it has been corrected to use the old way to do
serialize and deserialize
+ // in the latest version. So if a wal is created by 2.1.0 (wal version=0
&& be_exec_version=3),
+ // it should upgrade the be_exec_version to 4 to use the new way to
deserialize pblock to solve
+ // compatibility issues.see https://github.com/apache/doris/pull/32299
+ if (_version == 0 && pblock.has_be_exec_version() &&
pblock.be_exec_version() == 3) {
+ VLOG_DEBUG << "need to set be_exec_version to 4 to solve compatibility
issues";
+ pblock.set_be_exec_version(4);
+ }
if (st.is<ErrorCode::END_OF_FILE>()) {
LOG(INFO) << "read eof on wal:" << _wal_path;
*read_rows = 0;
@@ -99,7 +108,7 @@ Status WalReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
Status WalReader::get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) {
std::string col_ids;
- RETURN_IF_ERROR(_wal_reader->read_header(col_ids));
+ RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
std::vector<std::string> column_id_vector =
strings::Split(col_ids, ",", strings::SkipWhitespace());
_column_id_count = column_id_vector.size();
diff --git a/be/src/vec/exec/format/wal/wal_reader.h
b/be/src/vec/exec/format/wal/wal_reader.h
index d2636d5495e..09311496c16 100644
--- a/be/src/vec/exec/format/wal/wal_reader.h
+++ b/be/src/vec/exec/format/wal/wal_reader.h
@@ -41,6 +41,7 @@ private:
// column_id, column_pos
std::map<int64_t, int64_t> _column_pos_map;
int64_t _column_id_count;
+ uint32_t _version = 0;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp
b/be/src/vec/sink/writer/vwal_writer.cpp
index 569f9bcd652..76e0bf0679c 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -60,7 +60,7 @@ Status VWalWriter::init() {
ss << std::to_string(slot_desc.col_unique_id) << ",";
}
std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
- RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids));
+ RETURN_IF_ERROR(_wal_writer->append_header(col_ids));
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vwal_writer.h
b/be/src/vec/sink/writer/vwal_writer.h
index f22250cb5d4..a9fa218f330 100644
--- a/be/src/vec/sink/writer/vwal_writer.h
+++ b/be/src/vec/sink/writer/vwal_writer.h
@@ -44,8 +44,6 @@ private:
int64_t _db_id;
int64_t _tb_id;
int64_t _wal_id;
- // TODO version should in olap/wal_writer
- uint32_t _version = 0;
std::string _label;
WalManager* _wal_manager;
std::vector<TSlotDescriptor>& _slot_descs;
diff --git a/be/test/exec/test_data/wal_scanner/wal
b/be/test/exec/test_data/wal_scanner/wal
deleted file mode 100644
index 2c5fe90963d..00000000000
Binary files a/be/test/exec/test_data/wal_scanner/wal and /dev/null differ
diff --git a/be/test/exec/test_data/wal_scanner/wal_version0
b/be/test/exec/test_data/wal_scanner/wal_version0
new file mode 100644
index 00000000000..ddce7508946
Binary files /dev/null and b/be/test/exec/test_data/wal_scanner/wal_version0
differ
diff --git a/be/test/exec/test_data/wal_scanner/wal_version1
b/be/test/exec/test_data/wal_scanner/wal_version1
new file mode 100644
index 00000000000..f84b280b1f5
Binary files /dev/null and b/be/test/exec/test_data/wal_scanner/wal_version1
differ
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp
b/be/test/vec/exec/vwal_scanner_test.cpp
index af292640368..cb32fef7406 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -64,10 +64,13 @@ private:
std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
int64_t _db_id = 1;
int64_t _tb_id = 2;
- int64_t _txn_id = 789;
- int64_t _version = 0;
+ int64_t _txn_id_1 = 123;
+ int64_t _txn_id_2 = 456;
+ uint32_t _version_0 = 0;
+ uint32_t _version_1 = 1;
int64_t _backend_id = 1001;
- std::string _label = "test";
+ std::string _label_1 = "test1";
+ std::string _label_2 = "test2";
TupleId _dst_tuple_id = 0;
RuntimeState _runtime_state;
@@ -238,8 +241,6 @@ void VWalScannerTest::init() {
_scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
_kv_cache.reset(new ShardedKVCache(48));
- _runtime_state._wal_id = _txn_id;
-
_master_info.reset(new TMasterInfo());
_env = ExecEnv::GetInstance();
_env->_master_info = _master_info.get();
@@ -249,11 +250,19 @@ void VWalScannerTest::init() {
_env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
- st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label,
base_path);
- std::string src = "./be/test/exec/test_data/wal_scanner/wal";
+ st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1,
_label_1, base_path,
+ _version_0);
+ std::string src = "./be/test/exec/test_data/wal_scanner/wal_version0";
std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id) + "/" +
- std::to_string(_version) + "_" +
std::to_string(_backend_id) + "_" +
- std::to_string(_txn_id) + "_" + _label;
+ std::to_string(_version_0) + "_" +
std::to_string(_backend_id) + "_" +
+ std::to_string(_txn_id_1) + "_" + _label_1;
+ std::filesystem::copy(src, dst);
+ st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_2,
_label_2, base_path,
+ _version_1);
+ src = "./be/test/exec/test_data/wal_scanner/wal_version1";
+ dst = _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id) + "/" +
+ std::to_string(_version_1) + "_" + std::to_string(_backend_id) + "_"
+
+ std::to_string(_txn_id_2) + "_" + _label_2;
std::filesystem::copy(src, dst);
}
@@ -269,6 +278,8 @@ void
VWalScannerTest::generate_scanner(std::shared_ptr<VFileScanner>& scanner) {
}
TEST_F(VWalScannerTest, normal) {
+ // read wal file with wal_version=0
+ _runtime_state._wal_id = _txn_id_1;
std::shared_ptr<VFileScanner> scanner = nullptr;
generate_scanner(scanner);
std::unique_ptr<vectorized::Block> block(new vectorized::Block());
@@ -282,6 +293,19 @@ TEST_F(VWalScannerTest, normal) {
EXPECT_EQ(0, block->rows());
ASSERT_TRUE(eof);
WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
+ // read wal file with wal_version=1
+ eof = false;
+ _runtime_state._wal_id = _txn_id_2;
+ generate_scanner(scanner);
+ st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_TRUE(st.ok());
+ EXPECT_EQ(3, block->rows());
+ block->clear();
+ st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_TRUE(st.ok());
+ EXPECT_EQ(0, block->rows());
+ ASSERT_TRUE(eof);
+ WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
}
TEST_F(VWalScannerTest, fail_with_not_equal) {
@@ -296,6 +320,7 @@ TEST_F(VWalScannerTest, fail_with_not_equal) {
[](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2;
});
sp->enable_processing();
+ _runtime_state._wal_id = _txn_id_1;
std::shared_ptr<VFileScanner> scanner = nullptr;
generate_scanner(scanner);
std::unique_ptr<vectorized::Block> block(new vectorized::Block());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]