acelyc111 commented on code in PR #2142:
URL:
https://github.com/apache/incubator-pegasus/pull/2142#discussion_r1840736407
##########
src/meta/meta_backup_engine.cpp:
##########
@@ -48,72 +48,93 @@
#include "utils/blob.h"
#include "utils/chrono_literals.h"
#include "utils/error_code.h"
+#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/zlocks.h"
namespace dsn {
namespace replication {
-backup_engine::backup_engine(backup_service *service)
- : _backup_service(service), _block_service(nullptr), _backup_path(""),
_is_backup_failed(false)
+meta_backup_engine::meta_backup_engine(meta_service *meta_svc, bool
is_periodic)
+ : _meta_svc(meta_svc), _is_periodic_backup(is_periodic)
{
}
-backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); }
+meta_backup_engine::~meta_backup_engine() {
_tracker.cancel_outstanding_tasks(); }
-error_code backup_engine::init_backup(int32_t app_id)
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::init_backup(int32_t app_id,
+ int32_t partition_count,
+ const std::string &app_name,
+ const std::string &provider,
+ const std::string &backup_root_path)
{
- std::string app_name;
- int partition_count;
- {
- zauto_read_lock l;
- _backup_service->get_state()->lock_read(l);
- std::shared_ptr<app_state> app =
_backup_service->get_state()->get_app(app_id);
- if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
- LOG_ERROR("app {} is not available, couldn't do backup now.",
app_id);
- return ERR_INVALID_STATE;
- }
- app_name = app->app_name;
- partition_count = app->partition_count;
- }
+ zauto_write_lock l(_lock);
- zauto_lock lock(_lock);
_backup_status.clear();
for (int i = 0; i < partition_count; ++i) {
- _backup_status.emplace(i, backup_status::UNALIVE);
+ _backup_status.emplace_back(backup_status::UNINITIALIZED);
}
_cur_backup.app_id = app_id;
_cur_backup.app_name = app_name;
_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
_cur_backup.start_time_ms = _cur_backup.backup_id;
- return ERR_OK;
+ _cur_backup.backup_provider_type = provider;
+ _cur_backup.backup_path = backup_root_path;
+ _cur_backup.status = backup_status::UNINITIALIZED;
+ _is_backup_failed = false;
+ _is_backup_canceled = false;
}
-error_code backup_engine::set_block_service(const std::string &provider)
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::start()
{
- _provider_type = provider;
- _block_service = _backup_service->get_meta_service()
- ->get_block_service_manager()
- .get_or_create_block_filesystem(provider);
- if (_block_service == nullptr) {
- return ERR_INVALID_PARAMETERS;
+ LOG_DEBUG("App[{}] start {} backup[{}] on {}, root_path = {}",
+ _cur_backup.app_name,
+ _is_periodic_backup ? "periodic" : "onetime",
+ _cur_backup.backup_id,
+ _cur_backup.backup_provider_type,
+ _cur_backup.backup_path);
Review Comment:
What about mention the URI of the backup destination?
##########
src/meta/meta_backup_engine.cpp:
##########
@@ -48,72 +48,93 @@
#include "utils/blob.h"
#include "utils/chrono_literals.h"
#include "utils/error_code.h"
+#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/zlocks.h"
namespace dsn {
namespace replication {
-backup_engine::backup_engine(backup_service *service)
- : _backup_service(service), _block_service(nullptr), _backup_path(""),
_is_backup_failed(false)
+meta_backup_engine::meta_backup_engine(meta_service *meta_svc, bool
is_periodic)
+ : _meta_svc(meta_svc), _is_periodic_backup(is_periodic)
{
}
-backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); }
+meta_backup_engine::~meta_backup_engine() {
_tracker.cancel_outstanding_tasks(); }
-error_code backup_engine::init_backup(int32_t app_id)
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::init_backup(int32_t app_id,
+ int32_t partition_count,
+ const std::string &app_name,
+ const std::string &provider,
+ const std::string &backup_root_path)
{
- std::string app_name;
- int partition_count;
- {
- zauto_read_lock l;
- _backup_service->get_state()->lock_read(l);
- std::shared_ptr<app_state> app =
_backup_service->get_state()->get_app(app_id);
- if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
- LOG_ERROR("app {} is not available, couldn't do backup now.",
app_id);
- return ERR_INVALID_STATE;
- }
- app_name = app->app_name;
- partition_count = app->partition_count;
- }
+ zauto_write_lock l(_lock);
- zauto_lock lock(_lock);
_backup_status.clear();
for (int i = 0; i < partition_count; ++i) {
- _backup_status.emplace(i, backup_status::UNALIVE);
+ _backup_status.emplace_back(backup_status::UNINITIALIZED);
}
_cur_backup.app_id = app_id;
_cur_backup.app_name = app_name;
_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
_cur_backup.start_time_ms = _cur_backup.backup_id;
- return ERR_OK;
+ _cur_backup.backup_provider_type = provider;
+ _cur_backup.backup_path = backup_root_path;
+ _cur_backup.status = backup_status::UNINITIALIZED;
+ _is_backup_failed = false;
+ _is_backup_canceled = false;
}
-error_code backup_engine::set_block_service(const std::string &provider)
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::start()
{
- _provider_type = provider;
- _block_service = _backup_service->get_meta_service()
- ->get_block_service_manager()
- .get_or_create_block_filesystem(provider);
- if (_block_service == nullptr) {
- return ERR_INVALID_PARAMETERS;
+ LOG_DEBUG("App[{}] start {} backup[{}] on {}, root_path = {}",
+ _cur_backup.app_name,
+ _is_periodic_backup ? "periodic" : "onetime",
+ _cur_backup.backup_id,
+ _cur_backup.backup_provider_type,
+ _cur_backup.backup_path);
+ error_code err = write_app_info();
+ if (err != ERR_OK) {
+ LOG_ERROR("backup_id({}): backup meta data for app {} failed, error
{}",
+ _cur_backup.backup_id,
+ _cur_backup.app_id,
+ err);
+ update_backup_item_on_remote_storage(backup_status::FAILED,
dsn_now_ms());
+ return;
+ }
+ update_backup_item_on_remote_storage(backup_status::CHECKPOINTING);
+ FAIL_POINT_INJECT_F("meta_backup_engine_start", [](absl::string_view) {});
+ for (auto i = 0; i < _backup_status.size(); ++i) {
+ zauto_write_lock l(_lock);
+ _backup_status[i] = backup_status::CHECKPOINTING;
+ tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() {
+ backup_app_partition(gpid(_cur_backup.app_id, i));
+ });
}
- return ERR_OK;
}
-error_code backup_engine::set_backup_path(const std::string &path)
+// ThreadPool: THREAD_POOL_DEFAULT
+error_code meta_backup_engine::write_app_info()
{
- if (_block_service && _block_service->is_root_path_set()) {
- return ERR_INVALID_PARAMETERS;
- }
- LOG_INFO("backup path is set to {}.", path);
- _backup_path = path;
+ // TODO(heyuchen): TBD
+ // guoningshen: write app info on block service
Review Comment:
```suggestion
// TODO(guoningshen): write app info on block service
```
##########
idl/backup.thrift:
##########
@@ -184,16 +184,28 @@ struct start_backup_app_response
3:optional i64 backup_id;
}
+enum backup_status
+{
+ UNINITIALIZED,
+ CHECKPOINTING,
+ CHECKPOINTED,
+ UPLOADING,
+ SUCCEED,
+ FAILED,
+ CANCELED
+}
+
struct backup_item
{
1:i64 backup_id;
- 2:string app_name;
- 3:string backup_provider_type;
+ 2:i32 app_id;
+ 3:string app_name;
+ 4:string backup_provider_type;
// user specified backup_path.
- 4:string backup_path;
- 5:i64 start_time_ms;
- 6:i64 end_time_ms;
- 7:bool is_backup_failed;
+ 5:string backup_path;
Review Comment:
While you change the field sequence, it would cause compatiable issues. How
do you resolve it?
It's just the backup policy is incompatiable, however, the backup data is
compatiable, that is to say, the backup data of the old implementation can be
read normally by the new implemantation, right?
##########
src/meta/meta_backup_engine.h:
##########
@@ -63,45 +56,81 @@ struct app_backup_info
DEFINE_JSON_SERIALIZATION(backup_id, start_time_ms, end_time_ms, app_id,
app_name)
};
-class backup_service;
-
-class backup_engine
+///
+/// Meta backup status
+///
+/// start backup
+/// |
+/// v Error/Cancel
+/// Checkpointing ------------->|
+/// | |
+/// v Error/Cancel |
+/// Uploading -------------->|
+/// | |
+/// v v
+/// Succeed Failed/Canceled
+///
+class meta_backup_engine
{
public:
- backup_engine(backup_service *service);
- ~backup_engine();
-
- error_code init_backup(int32_t app_id);
- error_code set_block_service(const std::string &provider);
- error_code set_backup_path(const std::string &path);
-
- error_code start();
+ explicit meta_backup_engine(meta_service *meta_svc, bool is_periodic);
+ ~meta_backup_engine();
int64_t get_current_backup_id() const { return _cur_backup.backup_id; }
int32_t get_backup_app_id() const { return _cur_backup.app_id; }
- bool is_in_progress() const;
- backup_item get_backup_item() const;
+ backup_item get_backup_item() const
+ {
+ zauto_read_lock l(_lock);
+ backup_item item = _cur_backup;
Review Comment:
```suggestion
{
zauto_read_lock l(_lock);
backup_item item = _cur_backup;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]