empiredan commented on code in PR #1399:
URL:
https://github.com/apache/incubator-pegasus/pull/1399#discussion_r1142869222
##########
src/replica/test/mock_utils.h:
##########
@@ -46,7 +46,7 @@ class mock_replication_app_base : public replication_app_base
explicit mock_replication_app_base(replica *replica) :
replication_app_base(replica) {}
error_code start(int, char **) override { return ERR_NOT_IMPLEMENTED; }
- error_code stop(bool) override { return ERR_NOT_IMPLEMENTED; }
+ error_code stop(bool) override { return ERR_OK; }
Review Comment:
Why change to `ERR_OK` ?
##########
src/common/fs_manager.cpp:
##########
@@ -162,6 +162,7 @@ fs_manager::fs_manager(bool for_test)
dir_node *fs_manager::get_dir_node(const std::string &subdir)
{
+ // TODO(yingchun): need lock?
Review Comment:
Yeah I think it needs a read lock here:
```suggestion
zauto_read_lock l(_lock);
```
##########
src/replica/replica_stub.cpp:
##########
@@ -826,16 +826,16 @@ void replica_stub::initialize(const replication_options
&opts, bool clear /* = f
}
}
-void replica_stub::initialize_fs_manager(std::vector<std::string> &data_dirs,
- std::vector<std::string>
&data_dir_tags)
+void replica_stub::initialize_fs_manager(const std::vector<std::string>
&data_dirs,
+ const std::vector<std::string>
&data_dir_tags)
{
std::string cdir;
std::string err_msg;
int count = 0;
std::vector<std::string> available_dirs;
std::vector<std::string> available_dir_tags;
for (auto i = 0; i < data_dir_tags.size(); ++i) {
- std::string &dir = data_dirs[i];
+ const std::string &dir = data_dirs[i];
Review Comment:
```suggestion
const auto &dir = data_dirs[i];
```
##########
src/replica/replica_stub.cpp:
##########
@@ -3200,5 +3196,27 @@ void replica_stub::update_config(const std::string &name)
UPDATE_CONFIG(_config_sync_timer_task->update_interval,
config_sync_interval_ms, name);
}
+void replica_stub::wait_closing_replicas_finished()
+{
+ zauto_write_lock l(_replicas_lock);
+ while (!_closing_replicas.empty()) {
+ task_ptr task = std::get<0>(_closing_replicas.begin()->second);
+ gpid first_gpid = _closing_replicas.begin()->first;
Review Comment:
```suggestion
auto task = std::get<0>(_closing_replicas.begin()->second);
auto first_gpid = _closing_replicas.begin()->first;
```
##########
src/replica/replica_2pc.cpp:
##########
@@ -542,7 +545,10 @@ void replica::on_prepare(dsn::message_ex *request)
}
error_code err = _prepare_list->prepare(mu, status(),
pop_all_committed_mutations);
- CHECK_EQ_MSG(err, ERR_OK, "prepare mutation failed");
+ if (err != ERR_OK) {
Review Comment:
Since for `prepare()` there is a call chain `prepare_list::prepare() =>
prepare_list::commit() => replica::execute_mutation() =>
_app->apply_mutation() => pegasus_server_impl::on_batched_write_requests() =>
pegasus_server_write::on_batched_write_requests() =>
pegasus_server_write::on_batched_writes()` where rocksdb write interface will
be called, is it necessary to call `handle_local_failure()` for error ?
##########
src/common/fs_manager.cpp:
##########
@@ -162,6 +162,7 @@ fs_manager::fs_manager(bool for_test)
dir_node *fs_manager::get_dir_node(const std::string &subdir)
Review Comment:
```suggestion
dir_node *fs_manager::get_dir_node(const std::string &subdir) const
```
##########
src/replica/prepare_list.cpp:
##########
@@ -164,46 +170,49 @@ void prepare_list::commit(decree d, commit_type ct)
_last_committed_decree++;
last_bt = mu->data.header.ballot;
- _committer(mu);
+ ERR_LOG_PREFIX_AND_RETURN_NOT_OK(_committer(mu),
+ "commit error in
COMMIT_TO_DECREE_HARD");
}
- return;
+ return ERR_OK;
}
case COMMIT_TO_DECREE_SOFT: {
for (decree d0 = last_committed_decree() + 1; d0 <= d; d0++) {
mutation_ptr mu = get_mutation_by_decree(d0);
if (mu != nullptr && mu->is_ready_for_commit() &&
mu->data.header.ballot >= last_bt) {
_last_committed_decree++;
last_bt = mu->data.header.ballot;
- _committer(mu);
+ ERR_LOG_PREFIX_AND_RETURN_NOT_OK(_committer(mu),
+ "commit error in
COMMIT_TO_DECREE_SOFT");
} else
break;
}
- return;
+ return ERR_OK;
}
case COMMIT_ALL_READY: {
- if (d != last_committed_decree() + 1)
- return;
+ if (d != last_committed_decree() + 1) {
+ return ERR_OK;
+ }
int count = 0;
mutation_ptr mu = get_mutation_by_decree(last_committed_decree() + 1);
while (mu != nullptr && mu->is_ready_for_commit() &&
mu->data.header.ballot >= last_bt) {
_last_committed_decree++;
last_bt = mu->data.header.ballot;
- _committer(mu);
+ ERR_LOG_PREFIX_AND_RETURN_NOT_OK(_committer(mu), "commit error in
COMMIT_ALL_READY");
Review Comment:
Once failed, should everything be rolled back ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]