This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4599c4e474c [fix](group commit) Fix compatibility issues on 
serializing and deserializing wal file (#32299)
4599c4e474c is described below

commit 4599c4e474cb89ccbffc522c5141f32d69539231
Author: huanghaibin <[email protected]>
AuthorDate: Tue Mar 19 22:04:45 2024 +0800

    [fix](group commit) Fix compatibility issues on serializing and 
deserializing wal file (#32299)
---
 be/src/olap/wal/wal_manager.cpp                 |   5 +--
 be/src/olap/wal/wal_manager.h                   |   7 ++--
 be/src/olap/wal/wal_reader.cpp                  |   4 +--
 be/src/olap/wal/wal_reader.h                    |   3 +-
 be/src/olap/wal/wal_table.cpp                   |   5 ++-
 be/src/olap/wal/wal_writer.cpp                  |   4 +--
 be/src/olap/wal/wal_writer.h                    |   2 +-
 be/src/runtime/group_commit_mgr.cpp             |  20 +++++++----
 be/src/vec/exec/format/wal/wal_reader.cpp       |  11 +++++-
 be/src/vec/exec/format/wal/wal_reader.h         |   1 +
 be/src/vec/sink/writer/vwal_writer.cpp          |   2 +-
 be/src/vec/sink/writer/vwal_writer.h            |   2 --
 be/test/exec/test_data/wal_scanner/wal          | Bin 180 -> 0 bytes
 be/test/exec/test_data/wal_scanner/wal_version0 | Bin 0 -> 220 bytes
 be/test/exec/test_data/wal_scanner/wal_version1 | Bin 0 -> 272 bytes
 be/test/vec/exec/vwal_scanner_test.cpp          |  43 +++++++++++++++++++-----
 16 files changed, 78 insertions(+), 31 deletions(-)

diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 10a979f89dd..cce5d14d69e 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -205,11 +205,12 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) {
 }
 
 Status WalManager::create_wal_path(int64_t db_id, int64_t table_id, int64_t 
wal_id,
-                                   const std::string& label, std::string& 
base_path) {
+                                   const std::string& label, std::string& 
base_path,
+                                   uint32_t wal_version) {
     base_path = _wal_dirs_info->get_available_random_wal_dir();
     std::stringstream ss;
     ss << base_path << "/" << std::to_string(db_id) << "/" << 
std::to_string(table_id) << "/"
-       << _wal_version << "_" << _exec_env->master_info()->backend_id << "_"
+       << std::to_string(wal_version) << "_" << 
_exec_env->master_info()->backend_id << "_"
        << std::to_string(wal_id) << "_" << label;
     {
         std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
diff --git a/be/src/olap/wal/wal_manager.h b/be/src/olap/wal/wal_manager.h
index 3710b9e31ca..9cc9f95bd79 100644
--- a/be/src/olap/wal/wal_manager.h
+++ b/be/src/olap/wal/wal_manager.h
@@ -73,7 +73,7 @@ public:
 
     // replay wal
     Status create_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
-                           const std::string& label, std::string& base_path);
+                           const std::string& label, std::string& base_path, 
uint32_t wal_version);
     Status get_wal_path(int64_t wal_id, std::string& wal_path);
     Status delete_wal(int64_t table_id, int64_t wal_id);
     Status rename_to_tmp_path(const std::string wal, int64_t table_id, int64_t 
wal_id);
@@ -144,7 +144,6 @@ private:
     std::shared_mutex _wal_queue_lock;
     std::unordered_map<int64_t, std::set<int64_t>> _wal_queues;
 
-    int64_t _wal_version = 0;
     std::atomic<bool> _first_replay;
 
     // for test relay
@@ -154,4 +153,8 @@ private:
     std::shared_mutex _wal_cv_lock;
     std::unordered_map<int64_t, WalCvInfo> _wal_cv_map;
 };
+
+// In doris 2.1.0, wal version is 0, now need to upgrade it to 1 to solve 
compatibility issues.
+// see https://github.com/apache/doris/pull/32299
+constexpr inline uint32_t WAL_VERSION = 1;
 } // namespace doris
diff --git a/be/src/olap/wal/wal_reader.cpp b/be/src/olap/wal/wal_reader.cpp
index d1263daf1c5..9e4618b2bc1 100644
--- a/be/src/olap/wal/wal_reader.cpp
+++ b/be/src/olap/wal/wal_reader.cpp
@@ -85,7 +85,7 @@ Status WalReader::read_block(PBlock& block) {
     return Status::OK();
 }
 
-Status WalReader::read_header(std::string& col_ids) {
+Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
     if (file_reader->size() == 0) {
         return Status::DataQualityError("empty file");
     }
@@ -101,7 +101,7 @@ Status WalReader::read_header(std::string& col_ids) {
     RETURN_IF_ERROR(
             file_reader->read_at(_offset, {version_buf, 
WalWriter::VERSION_SIZE}, &bytes_read));
     _offset += WalWriter::VERSION_SIZE;
-    _version = decode_fixed32_le(version_buf);
+    version = decode_fixed32_le(version_buf);
     uint8_t len_buf[WalWriter::LENGTH_SIZE];
     RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, 
WalWriter::LENGTH_SIZE}, &bytes_read));
     _offset += WalWriter::LENGTH_SIZE;
diff --git a/be/src/olap/wal/wal_reader.h b/be/src/olap/wal/wal_reader.h
index 1f26a7598f0..c47d029331a 100644
--- a/be/src/olap/wal/wal_reader.h
+++ b/be/src/olap/wal/wal_reader.h
@@ -32,13 +32,12 @@ public:
     Status finalize();
 
     Status read_block(PBlock& block);
-    Status read_header(std::string& col_ids);
+    Status read_header(uint32_t& version, std::string& col_ids);
 
 private:
     Status _check_checksum(const char* binary, size_t size, uint32_t checksum);
 
     std::string _file_name;
-    uint32_t _version = 0;
     size_t _offset;
     io::FileReaderSPtr file_reader;
 };
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 14e3779748c..641ef8c6647 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -321,7 +321,10 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t 
tb_id,
 Status WalTable::_read_wal_header(const std::string& wal_path, std::string& 
columns) {
     std::shared_ptr<doris::WalReader> wal_reader = 
std::make_shared<WalReader>(wal_path);
     RETURN_IF_ERROR(wal_reader->init());
-    RETURN_IF_ERROR(wal_reader->read_header(columns));
+    uint32_t version = 0;
+    RETURN_IF_ERROR(wal_reader->read_header(version, columns));
+    VLOG_DEBUG << "wal=" << wal_path << ",version=" << std::to_string(version)
+               << ",columns=" << columns;
     RETURN_IF_ERROR(wal_reader->finalize());
     return Status::OK();
 }
diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp
index 43f021c9d31..5658eded564 100644
--- a/be/src/olap/wal/wal_writer.cpp
+++ b/be/src/olap/wal/wal_writer.cpp
@@ -90,7 +90,7 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
     return Status::OK();
 }
 
-Status WalWriter::append_header(uint32_t version, std::string col_ids) {
+Status WalWriter::append_header(std::string col_ids) {
     if (!_file_writer) {
         return Status::InternalError("wal writer is null,fail to write 
file={}", _file_name);
     }
@@ -105,7 +105,7 @@ Status WalWriter::append_header(uint32_t version, 
std::string col_ids) {
     offset += k_wal_magic_length;
 
     uint8_t version_buf[sizeof(uint32_t)];
-    encode_fixed32_le(version_buf, version);
+    encode_fixed32_le(version_buf, WAL_VERSION);
     RETURN_IF_ERROR(_file_writer->append({version_buf, sizeof(uint32_t)}));
     offset += VERSION_SIZE;
     uint8_t len_buf[sizeof(uint64_t)];
diff --git a/be/src/olap/wal/wal_writer.h b/be/src/olap/wal/wal_writer.h
index 08d2a4eb710..f730e026660 100644
--- a/be/src/olap/wal/wal_writer.h
+++ b/be/src/olap/wal/wal_writer.h
@@ -36,7 +36,7 @@ public:
     Status finalize();
 
     Status append_blocks(const PBlockArray& blocks);
-    Status append_header(uint32_t version, std::string col_ids);
+    Status append_header(std::string col_ids);
 
     std::string file_name() { return _file_name; };
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index cadff231761..646b58c5fdc 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -314,8 +314,12 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version) {
     st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, 
params,
                              pipeline_params);
     if (!st.ok()) {
-        static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label, 
txn_id, instance_id,
-                                                    st, nullptr));
+        auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, 
txn_id, instance_id,
+                                                   st, nullptr);
+        if (!finish_st.ok()) {
+            LOG(WARNING) << "finish group commit error, label=" << label
+                         << ", st=" << finish_st.to_string();
+        }
     }
     return st;
 }
@@ -432,8 +436,12 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t 
db_id, int64_t table_id,
                                              const TExecPlanFragmentParams& 
params,
                                              const TPipelineFragmentParams& 
pipeline_params) {
     auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* 
state, Status* status) {
-        static_cast<void>(_finish_group_commit_load(db_id, table_id, label, 
txn_id,
-                                                    
state->fragment_instance_id(), *status, state));
+        auto finish_st = _finish_group_commit_load(db_id, table_id, label, 
txn_id,
+                                                   
state->fragment_instance_id(), *status, state);
+        if (!finish_st.ok()) {
+            LOG(WARNING) << "finish group commit error, label=" << label
+                         << ", st=" << finish_st.to_string();
+        }
     };
     if (is_pipeline) {
         return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, 
finish_cb);
@@ -512,8 +520,8 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t 
tb_id, int64_t wal_id,
     std::string real_label = config::group_commit_wait_replay_wal_finish
                                      ? import_label + "_test_wait"
                                      : import_label;
-    RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(db_id, 
tb_id, wal_id,
-                                                                       
real_label, _wal_base_path));
+    RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path(
+            db_id, tb_id, wal_id, real_label, _wal_base_path, WAL_VERSION));
     _v_wal_writer = std::make_shared<vectorized::VWalWriter>(
             db_id, tb_id, wal_id, real_label, wal_manager, slot_desc, 
be_exe_version);
     return _v_wal_writer->init();
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp 
b/be/src/vec/exec/format/wal/wal_reader.cpp
index 981e740d05c..1fbd395ad26 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -47,6 +47,15 @@ Status WalReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
     //read src block
     PBlock pblock;
     auto st = _wal_reader->read_block(pblock);
+    // Due to historical reasons, be_exec_version=3 will use the new way to 
serialize block
+    // in doris 2.1.0, now it has been corrected to use the old way to do 
serialize and deserialize
+    // in the latest version. So if a wal is created by 2.1.0 (wal version=0 
&& be_exec_version=3),
+    // it should upgrade the be_exec_version to 4 to use the new way to 
deserialize pblock to solve
+    // compatibility issues.see https://github.com/apache/doris/pull/32299
+    if (_version == 0 && pblock.has_be_exec_version() && 
pblock.be_exec_version() == 3) {
+        VLOG_DEBUG << "need to set be_exec_version to 4 to solve compatibility 
issues";
+        pblock.set_be_exec_version(4);
+    }
     if (st.is<ErrorCode::END_OF_FILE>()) {
         LOG(INFO) << "read eof on wal:" << _wal_path;
         *read_rows = 0;
@@ -99,7 +108,7 @@ Status WalReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
 Status WalReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                               std::unordered_set<std::string>* missing_cols) {
     std::string col_ids;
-    RETURN_IF_ERROR(_wal_reader->read_header(col_ids));
+    RETURN_IF_ERROR(_wal_reader->read_header(_version, col_ids));
     std::vector<std::string> column_id_vector =
             strings::Split(col_ids, ",", strings::SkipWhitespace());
     _column_id_count = column_id_vector.size();
diff --git a/be/src/vec/exec/format/wal/wal_reader.h 
b/be/src/vec/exec/format/wal/wal_reader.h
index d2636d5495e..09311496c16 100644
--- a/be/src/vec/exec/format/wal/wal_reader.h
+++ b/be/src/vec/exec/format/wal/wal_reader.h
@@ -41,6 +41,7 @@ private:
     // column_id, column_pos
     std::map<int64_t, int64_t> _column_pos_map;
     int64_t _column_id_count;
+    uint32_t _version = 0;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp 
b/be/src/vec/sink/writer/vwal_writer.cpp
index 569f9bcd652..76e0bf0679c 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -60,7 +60,7 @@ Status VWalWriter::init() {
         ss << std::to_string(slot_desc.col_unique_id) << ",";
     }
     std::string col_ids = ss.str().substr(0, ss.str().size() - 1);
-    RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids));
+    RETURN_IF_ERROR(_wal_writer->append_header(col_ids));
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/writer/vwal_writer.h 
b/be/src/vec/sink/writer/vwal_writer.h
index f22250cb5d4..a9fa218f330 100644
--- a/be/src/vec/sink/writer/vwal_writer.h
+++ b/be/src/vec/sink/writer/vwal_writer.h
@@ -44,8 +44,6 @@ private:
     int64_t _db_id;
     int64_t _tb_id;
     int64_t _wal_id;
-    // TODO version should in olap/wal_writer
-    uint32_t _version = 0;
     std::string _label;
     WalManager* _wal_manager;
     std::vector<TSlotDescriptor>& _slot_descs;
diff --git a/be/test/exec/test_data/wal_scanner/wal 
b/be/test/exec/test_data/wal_scanner/wal
deleted file mode 100644
index 2c5fe90963d..00000000000
Binary files a/be/test/exec/test_data/wal_scanner/wal and /dev/null differ
diff --git a/be/test/exec/test_data/wal_scanner/wal_version0 
b/be/test/exec/test_data/wal_scanner/wal_version0
new file mode 100644
index 00000000000..ddce7508946
Binary files /dev/null and b/be/test/exec/test_data/wal_scanner/wal_version0 
differ
diff --git a/be/test/exec/test_data/wal_scanner/wal_version1 
b/be/test/exec/test_data/wal_scanner/wal_version1
new file mode 100644
index 00000000000..f84b280b1f5
Binary files /dev/null and b/be/test/exec/test_data/wal_scanner/wal_version1 
differ
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp 
b/be/test/vec/exec/vwal_scanner_test.cpp
index af292640368..cb32fef7406 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -64,10 +64,13 @@ private:
     std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
     int64_t _db_id = 1;
     int64_t _tb_id = 2;
-    int64_t _txn_id = 789;
-    int64_t _version = 0;
+    int64_t _txn_id_1 = 123;
+    int64_t _txn_id_2 = 456;
+    uint32_t _version_0 = 0;
+    uint32_t _version_1 = 1;
     int64_t _backend_id = 1001;
-    std::string _label = "test";
+    std::string _label_1 = "test1";
+    std::string _label_2 = "test2";
 
     TupleId _dst_tuple_id = 0;
     RuntimeState _runtime_state;
@@ -238,8 +241,6 @@ void VWalScannerTest::init() {
     _scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
     _kv_cache.reset(new ShardedKVCache(48));
 
-    _runtime_state._wal_id = _txn_id;
-
     _master_info.reset(new TMasterInfo());
     _env = ExecEnv::GetInstance();
     _env->_master_info = _master_info.get();
@@ -249,11 +250,19 @@ void VWalScannerTest::init() {
     _env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
     std::string base_path;
     auto st = _env->_wal_manager->_init_wal_dirs_info();
-    st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label, 
base_path);
-    std::string src = "./be/test/exec/test_data/wal_scanner/wal";
+    st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, 
_label_1, base_path,
+                                             _version_0);
+    std::string src = "./be/test/exec/test_data/wal_scanner/wal_version0";
     std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + 
std::to_string(_tb_id) + "/" +
-                      std::to_string(_version) + "_" + 
std::to_string(_backend_id) + "_" +
-                      std::to_string(_txn_id) + "_" + _label;
+                      std::to_string(_version_0) + "_" + 
std::to_string(_backend_id) + "_" +
+                      std::to_string(_txn_id_1) + "_" + _label_1;
+    std::filesystem::copy(src, dst);
+    st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_2, 
_label_2, base_path,
+                                             _version_1);
+    src = "./be/test/exec/test_data/wal_scanner/wal_version1";
+    dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + 
std::to_string(_tb_id) + "/" +
+          std::to_string(_version_1) + "_" + std::to_string(_backend_id) + "_" 
+
+          std::to_string(_txn_id_2) + "_" + _label_2;
     std::filesystem::copy(src, dst);
 }
 
@@ -269,6 +278,8 @@ void 
VWalScannerTest::generate_scanner(std::shared_ptr<VFileScanner>& scanner) {
 }
 
 TEST_F(VWalScannerTest, normal) {
+    // read wal file with wal_version=0
+    _runtime_state._wal_id = _txn_id_1;
     std::shared_ptr<VFileScanner> scanner = nullptr;
     generate_scanner(scanner);
     std::unique_ptr<vectorized::Block> block(new vectorized::Block());
@@ -282,6 +293,19 @@ TEST_F(VWalScannerTest, normal) {
     EXPECT_EQ(0, block->rows());
     ASSERT_TRUE(eof);
     WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
+    // read wal file with wal_version=1
+    eof = false;
+    _runtime_state._wal_id = _txn_id_2;
+    generate_scanner(scanner);
+    st = scanner->get_block(&_runtime_state, block.get(), &eof);
+    ASSERT_TRUE(st.ok());
+    EXPECT_EQ(3, block->rows());
+    block->clear();
+    st = scanner->get_block(&_runtime_state, block.get(), &eof);
+    ASSERT_TRUE(st.ok());
+    EXPECT_EQ(0, block->rows());
+    ASSERT_TRUE(eof);
+    WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
 }
 
 TEST_F(VWalScannerTest, fail_with_not_equal) {
@@ -296,6 +320,7 @@ TEST_F(VWalScannerTest, fail_with_not_equal) {
                       [](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2; 
});
     sp->enable_processing();
 
+    _runtime_state._wal_id = _txn_id_1;
     std::shared_ptr<VFileScanner> scanner = nullptr;
     generate_scanner(scanner);
     std::unique_ptr<vectorized::Block> block(new vectorized::Block());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to