This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ebabec1d3e6 [fix](migrate) Copy binlog files (#41083)
ebabec1d3e6 is described below
commit ebabec1d3e60b5b4ee8d40ea26dd52a8909f14fe
Author: walter <[email protected]>
AuthorDate: Sat Sep 21 09:19:26 2024 +0800
[fix](migrate) Copy binlog files (#41083)
---
be/src/olap/rowset/rowset_meta_manager.cpp | 63 +++++++++++++
be/src/olap/rowset/rowset_meta_manager.h | 3 +
be/src/olap/tablet.cpp | 5 +
be/src/olap/tablet.h | 1 +
be/src/olap/tablet_manager.cpp | 1 +
be/src/olap/task/engine_storage_migration_task.cpp | 102 ++++++++++++++++++++-
be/src/olap/task/engine_storage_migration_task.h | 5 +-
7 files changed, 176 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp
b/be/src/olap/rowset/rowset_meta_manager.cpp
index 9d1cbd88589..9ba6f9540db 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -357,6 +357,69 @@ Status
RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU
return status;
}
+Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid
tablet_uid,
+ Version version,
RowsetBinlogMetasPB* metas_pb) {
+ Status status;
+ auto tablet_uid_str = tablet_uid.to_string();
+ auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
+ auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
+ auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
+ auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
+ std::string_view key, std::string_view value)
-> bool {
+ VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}",
key, value);
+ if (key.compare(end_key) > 0) { // the binlog meta key is binary
comparable.
+ // All binlog meta has been scanned
+ return false;
+ }
+
+ if (!starts_with_binlog_meta(key)) {
+ auto err_msg = fmt::format("invalid binlog meta key:{}", key);
+ status = Status::InternalError(err_msg);
+ LOG(WARNING) << err_msg;
+ return false;
+ }
+
+ BinlogMetaEntryPB binlog_meta_entry_pb;
+ if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
+ auto err_msg = fmt::format("fail to parse binlog meta value:{}",
value);
+ status = Status::InternalError(err_msg);
+ LOG(WARNING) << err_msg;
+ return false;
+ }
+
+ const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
+ auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
+ binlog_meta_pb->set_rowset_id(rowset_id);
+ binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
+ binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
+ binlog_meta_pb->set_meta_key(std::string {key});
+ binlog_meta_pb->set_meta(std::string {value});
+
+ auto binlog_data_key =
+ make_binlog_data_key(tablet_uid_str,
binlog_meta_entry_pb.version(), rowset_id);
+ std::string binlog_data;
+ status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key,
&binlog_data);
+ if (!status.ok()) {
+ LOG(WARNING) << status.to_string();
+ return false;
+ }
+ binlog_meta_pb->set_data_key(binlog_data_key);
+ binlog_meta_pb->set_data(binlog_data);
+
+ return false;
+ };
+
+ Status iterStatus =
+ meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key,
traverse_func);
+ if (!iterStatus.ok()) {
+ LOG(WARNING) << fmt::format(
+ "fail to iterate binlog meta. prefix_key:{}, version:{},
status:{}", prefix_key,
+ version.to_string(), iterStatus.to_string());
+ return iterStatus;
+ }
+ return status;
+}
+
Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const
TabletUid tablet_uid,
RowsetBinlogMetasPB*
metas_pb) {
Status status;
diff --git a/be/src/olap/rowset/rowset_meta_manager.h
b/be/src/olap/rowset/rowset_meta_manager.h
index b61e8c02769..eb04128fded 100644
--- a/be/src/olap/rowset/rowset_meta_manager.h
+++ b/be/src/olap/rowset/rowset_meta_manager.h
@@ -72,6 +72,9 @@ public:
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid
tablet_uid,
const std::vector<int64_t>&
binlog_versions,
RowsetBinlogMetasPB* metas_pb);
+ // get all binlog metas of a tablet in version.
+ static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid
tablet_uid,
+ Version version,
RowsetBinlogMetasPB* metas_pb);
static Status remove_binlog(OlapMeta* meta, const std::string& suffix);
static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b23404583f7..51eabe5495e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2512,6 +2512,11 @@ Status Tablet::get_rowset_binlog_metas(const
std::vector<int64_t>& binlog_versio
binlog_versions,
metas_pb);
}
+Status Tablet::get_rowset_binlog_metas(Version binlog_versions,
RowsetBinlogMetasPB* metas_pb) {
+ return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(),
tablet_uid(),
+ binlog_versions,
metas_pb);
+}
+
std::string Tablet::get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const
{
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id,
segment_index);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 800c720a1c4..33253e82ced 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -436,6 +436,7 @@ public:
std::string_view rowset_id) const;
Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
+ Status get_rowset_binlog_metas(Version binlog_versions,
RowsetBinlogMetasPB* metas_pb);
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t
segment_index) const;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index e7679da0603..468a6b2fb12 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -972,6 +972,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store,
TTabletId tablet_id,
if (binlog_meta_filesize > 0) {
contain_binlog = true;
RETURN_IF_ERROR(read_pb(binlog_metas_file,
&rowset_binlog_metas_pb));
+ VLOG_DEBUG << "load rowset binlog metas from file. file_path=" <<
binlog_metas_file;
}
RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file));
}
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 7c870a5e8ea..21be34a334d 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -37,6 +37,7 @@
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
+#include "olap/pb_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
@@ -262,9 +263,11 @@ Status EngineStorageMigrationTask::_migrate() {
}
std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
+ RowsetBinlogMetasPB rowset_binlog_metas_pb;
do {
// migrate all index and data files but header file
- res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
+ res = _copy_index_and_data_files(full_path, temp_consistent_rowsets,
+ &rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
@@ -292,7 +295,8 @@ Status EngineStorageMigrationTask::_migrate() {
// we take the lock to complete it to avoid long-term competition
with other tasks
if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets))
{
// force to copy the remaining data and index
- res = _copy_index_and_data_files(full_path,
temp_consistent_rowsets);
+ res = _copy_index_and_data_files(full_path,
temp_consistent_rowsets,
+ &rowset_binlog_metas_pb);
if (!res.ok()) {
break;
}
@@ -307,6 +311,16 @@ Status EngineStorageMigrationTask::_migrate() {
}
}
+ // save rowset binlog metas
+ if (rowset_binlog_metas_pb.rowset_binlog_metas_size() > 0) {
+ auto rowset_binlog_metas_pb_filename =
+ fmt::format("{}/rowset_binlog_metas.pb", full_path);
+ res = write_pb(rowset_binlog_metas_pb_filename,
rowset_binlog_metas_pb);
+ if (!res.ok()) {
+ break;
+ }
+ }
+
// generate new tablet meta and write to hdr file
res = _gen_and_write_header_to_hdr_file(shard, full_path,
consistent_rowsets, end_version);
if (!res.ok()) {
@@ -350,10 +364,92 @@ void EngineStorageMigrationTask::_generate_new_header(
}
Status EngineStorageMigrationTask::_copy_index_and_data_files(
- const string& full_path, const std::vector<RowsetSharedPtr>&
consistent_rowsets) const {
+ const string& full_path, const std::vector<RowsetSharedPtr>&
consistent_rowsets,
+ RowsetBinlogMetasPB* all_binlog_metas_pb) const {
+ RowsetBinlogMetasPB rowset_binlog_metas_pb;
for (const auto& rs : consistent_rowsets) {
RETURN_IF_ERROR(rs->copy_files_to(full_path, rs->rowset_id()));
+
+ Version binlog_versions = rs->version();
+ RETURN_IF_ERROR(_tablet->get_rowset_binlog_metas(binlog_versions,
&rowset_binlog_metas_pb));
+ }
+
+ // copy index binlog files.
+ for (const auto& rowset_binlog_meta :
rowset_binlog_metas_pb.rowset_binlog_metas()) {
+ auto num_segments = rowset_binlog_meta.num_segments();
+ std::string_view rowset_id = rowset_binlog_meta.rowset_id();
+
+ RowsetMetaPB rowset_meta_pb;
+ if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+ auto err_msg = fmt::format("fail to parse binlog meta data
value:{}",
+ rowset_binlog_meta.data());
+ LOG(WARNING) << err_msg;
+ return Status::InternalError(err_msg);
+ }
+ const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+ TabletSchema tablet_schema;
+ tablet_schema.init_from_pb(tablet_schema_pb);
+
+ // copy segment files and index files
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ std::string segment_file_path =
_tablet->get_segment_filepath(rowset_id, segment_index);
+ auto snapshot_segment_file_path =
+ fmt::format("{}/{}_{}.binlog", full_path, rowset_id,
segment_index);
+
+ Status status =
io::global_local_filesystem()->copy_path(segment_file_path,
+
snapshot_segment_file_path);
+ if (!status.ok()) {
+ LOG(WARNING) << "fail to copy binlog segment file. [src=" <<
segment_file_path
+ << ", dest=" << snapshot_segment_file_path << "]"
<< status;
+ return status;
+ }
+ VLOG_DEBUG << "copy " << segment_file_path << " to " <<
snapshot_segment_file_path;
+
+ if (tablet_schema.get_inverted_index_storage_format() ==
+ InvertedIndexStorageFormatPB::V1) {
+ for (const auto& index : tablet_schema.indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ auto index_file =
+ _tablet->get_segment_index_filepath(rowset_id,
segment_index, index_id);
+ auto snapshot_segment_index_file_path =
+ fmt::format("{}/{}_{}_{}.binlog-index", full_path,
rowset_id,
+ segment_index, index_id);
+ VLOG_DEBUG << "copy " << index_file << " to "
+ << snapshot_segment_index_file_path;
+ status = io::global_local_filesystem()->copy_path(
+ index_file, snapshot_segment_index_file_path);
+ if (!status.ok()) {
+ LOG(WARNING)
+ << "fail to copy binlog index file. [src=" <<
index_file
+ << ", dest=" <<
snapshot_segment_index_file_path << "]" << status;
+ return status;
+ }
+ }
+ } else if (tablet_schema.has_inverted_index()) {
+ auto index_file =
InvertedIndexDescriptor::get_index_file_path_v2(
+
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
+ auto snapshot_segment_index_file_path =
+ fmt::format("{}/{}_{}.binlog-index", full_path,
rowset_id, segment_index);
+ VLOG_DEBUG << "copy " << index_file << " to " <<
snapshot_segment_index_file_path;
+ status = io::global_local_filesystem()->copy_path(index_file,
+
snapshot_segment_index_file_path);
+ if (!status.ok()) {
+ LOG(WARNING) << "fail to copy binlog index file. [src=" <<
index_file
+ << ", dest=" <<
snapshot_segment_index_file_path << "]" << status;
+ return status;
+ }
+ }
+ }
}
+
+ std::move(rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->begin(),
+ rowset_binlog_metas_pb.mutable_rowset_binlog_metas()->end(),
+ google::protobuf::RepeatedFieldBackInserter(
+ all_binlog_metas_pb->mutable_rowset_binlog_metas()));
+
return Status::OK();
}
diff --git a/be/src/olap/task/engine_storage_migration_task.h
b/be/src/olap/task/engine_storage_migration_task.h
index 8858854de92..7578b7de94f 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -17,6 +17,8 @@
#pragma once
+#include <gen_cpp/olap_file.pb.h>
+
#include <mutex>
#include <shared_mutex>
#include <string>
@@ -69,7 +71,8 @@ private:
// TODO: hkp
// rewrite this function
Status _copy_index_and_data_files(const std::string& full_path,
- const std::vector<RowsetSharedPtr>&
consistent_rowsets) const;
+ const std::vector<RowsetSharedPtr>&
consistent_rowsets,
+ RowsetBinlogMetasPB*
all_binlog_metas_pb) const;
private:
StorageEngine& _engine;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]