Shuo-Jia commented on a change in pull request #897:
URL: https://github.com/apache/incubator-pegasus/pull/897#discussion_r801570815
##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,123 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+ dassert(_is_open, "");
+ _pfc_batch_get_qps->increment();
+ int64_t start_time = dsn_now_ns();
+
+ auto &response = rpc.response();
+ response.app_id = _gpid.get_app_id();
+ response.partition_index = _gpid.get_partition_index();
+ response.server = _primary_address;
+
+ const auto &request = rpc.request();
+ if (request.keys.empty()) {
+ response.error = rocksdb::Status::kInvalidArgument;
+ derror_replica("Invalid argument for batch_get from {}: 'keys' field
in request is empty",
+ rpc.remote_address().to_string());
+ _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error,
response.data);
+ _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+ return;
+ }
+
+ std::vector<rocksdb::Slice> keys;
+ keys.reserve(request.keys.size());
+ std::vector<::dsn::blob> keys_holder;
+ keys_holder.reserve(request.keys.size());
+ for (auto &key : request.keys) {
+ ::dsn::blob raw_key;
+ pegasus_generate_key(raw_key, key.hash_key, key.sort_key);
+ keys.emplace_back(rocksdb::Slice(raw_key.data(), raw_key.length()));
+ keys_holder.emplace_back(std::move(raw_key));
+ }
+
+ rocksdb::Status final_status;
+ bool error_occurred = false;
+ int64_t total_data_size = 0;
+ uint32_t epoch_now = ::pegasus::utils::epoch_now();
+ std::vector<std::string> values;
+ std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts,
keys, &values);
+ response.data.reserve(request.keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ const auto &status = statuses[i];
+ const ::dsn::blob &hash_key = request.keys[i].hash_key;
+ const ::dsn::blob &sort_key = request.keys[i].sort_key;
+ ::dsn::apps::full_data r;
+ r.hash_key = hash_key;
+ r.sort_key = sort_key;
+ r.exists = false;
+ response.data.emplace_back(std::move(r));
+ ::dsn::apps::full_data ¤t_data = response.data.back();
Review comment:
whether to just `emplace_back` and return valid data. I find you still
return it and mark the value is empty `::dsn::blob()` ?
##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,123 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+ dassert(_is_open, "");
+ _pfc_batch_get_qps->increment();
+ int64_t start_time = dsn_now_ns();
+
+ auto &response = rpc.response();
+ response.app_id = _gpid.get_app_id();
+ response.partition_index = _gpid.get_partition_index();
+ response.server = _primary_address;
+
+ const auto &request = rpc.request();
+ if (request.keys.empty()) {
+ response.error = rocksdb::Status::kInvalidArgument;
+ derror_replica("Invalid argument for batch_get from {}: 'keys' field
in request is empty",
+ rpc.remote_address().to_string());
+ _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error,
response.data);
+ _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+ return;
+ }
+
+ std::vector<rocksdb::Slice> keys;
+ keys.reserve(request.keys.size());
+ std::vector<::dsn::blob> keys_holder;
+ keys_holder.reserve(request.keys.size());
+ for (auto &key : request.keys) {
+ ::dsn::blob raw_key;
+ pegasus_generate_key(raw_key, key.hash_key, key.sort_key);
+ keys.emplace_back(rocksdb::Slice(raw_key.data(), raw_key.length()));
+ keys_holder.emplace_back(std::move(raw_key));
+ }
+
+ rocksdb::Status final_status;
+ bool error_occurred = false;
+ int64_t total_data_size = 0;
+ uint32_t epoch_now = ::pegasus::utils::epoch_now();
+ std::vector<std::string> values;
+ std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts,
keys, &values);
+ response.data.reserve(request.keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ const auto &status = statuses[i];
+ const ::dsn::blob &hash_key = request.keys[i].hash_key;
+ const ::dsn::blob &sort_key = request.keys[i].sort_key;
+ ::dsn::apps::full_data r;
+ r.hash_key = hash_key;
+ r.sort_key = sort_key;
+ r.exists = false;
+ response.data.emplace_back(std::move(r));
+ ::dsn::apps::full_data ¤t_data = response.data.back();
+
+ if (status.IsNotFound()) {
+ current_data.value = ::dsn::blob();
+ current_data.exists = false;
+ continue;
+ }
+
+ std::string &value = values[i];
+
+ if (dsn_likely(status.ok())) {
+ if (check_if_record_expired(epoch_now, value)) {
+ if (_verbose_log) {
+ derror_replica("rocksdb data expired for batch_get from
{}",
+ rpc.remote_address().to_string());
+ }
+
+ current_data.value = ::dsn::blob();
+ current_data.exists = false;
+ continue;
+ }
+
+ ::dsn::blob real_value;
+ pegasus_extract_user_data(_pegasus_data_version, std::move(value),
real_value);
+ current_data.value = std::move(real_value);
+ current_data.exists = true;
+ total_data_size += current_data.value.size();
+ } else {
+ if (_verbose_log) {
+ derror_replica(
+ "rocksdb get failed for batch_get from {}: error = {},
key size = {}",
+ replica_name(),
+ rpc.remote_address().to_string(),
+ status.ToString().c_str(),
+ request.keys.size());
+ } else {
+ derror_replica("rocksdb get failed for batch_get from {}:
error = {}",
+ rpc.remote_address().to_string(),
+ status.ToString().c_str());
Review comment:
`status.ToString()` should be ok
##########
File path: src/server/pegasus_server_impl.h
##########
@@ -382,6 +398,8 @@ class pegasus_server_impl : public pegasus_read_service
uint64_t _abnormal_get_size_threshold;
uint64_t _abnormal_multi_get_size_threshold;
uint64_t _abnormal_multi_get_iterate_count_threshold;
+ uint64_t _abnormal_batch_get_size_threshold;
Review comment:
define `gflag` and consider if support `auto-update`
##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,123 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+ dassert(_is_open, "");
+ _pfc_batch_get_qps->increment();
+ int64_t start_time = dsn_now_ns();
+
+ auto &response = rpc.response();
+ response.app_id = _gpid.get_app_id();
+ response.partition_index = _gpid.get_partition_index();
+ response.server = _primary_address;
+
+ const auto &request = rpc.request();
+ if (request.keys.empty()) {
+ response.error = rocksdb::Status::kInvalidArgument;
+ derror_replica("Invalid argument for batch_get from {}: 'keys' field
in request is empty",
+ rpc.remote_address().to_string());
+ _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error,
response.data);
+ _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+ return;
+ }
+
+ std::vector<rocksdb::Slice> keys;
+ keys.reserve(request.keys.size());
+ std::vector<::dsn::blob> keys_holder;
+ keys_holder.reserve(request.keys.size());
+ for (auto &key : request.keys) {
+ ::dsn::blob raw_key;
+ pegasus_generate_key(raw_key, key.hash_key, key.sort_key);
+ keys.emplace_back(rocksdb::Slice(raw_key.data(), raw_key.length()));
+ keys_holder.emplace_back(std::move(raw_key));
+ }
+
+ rocksdb::Status final_status;
+ bool error_occurred = false;
+ int64_t total_data_size = 0;
+ uint32_t epoch_now = ::pegasus::utils::epoch_now();
+ std::vector<std::string> values;
+ std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts,
keys, &values);
+ response.data.reserve(request.keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ const auto &status = statuses[i];
+ const ::dsn::blob &hash_key = request.keys[i].hash_key;
+ const ::dsn::blob &sort_key = request.keys[i].sort_key;
+ ::dsn::apps::full_data r;
+ r.hash_key = hash_key;
+ r.sort_key = sort_key;
+ r.exists = false;
+ response.data.emplace_back(std::move(r));
+ ::dsn::apps::full_data ¤t_data = response.data.back();
+
+ if (status.IsNotFound()) {
+ current_data.value = ::dsn::blob();
+ current_data.exists = false;
+ continue;
+ }
+
+ std::string &value = values[i];
+
+ if (dsn_likely(status.ok())) {
+ if (check_if_record_expired(epoch_now, value)) {
+ if (_verbose_log) {
+ derror_replica("rocksdb data expired for batch_get from
{}",
+ rpc.remote_address().to_string());
+ }
+
+ current_data.value = ::dsn::blob();
+ current_data.exists = false;
+ continue;
+ }
+
+ ::dsn::blob real_value;
+ pegasus_extract_user_data(_pegasus_data_version, std::move(value),
real_value);
+ current_data.value = std::move(real_value);
+ current_data.exists = true;
+ total_data_size += current_data.value.size();
+ } else {
+ if (_verbose_log) {
+ derror_replica(
+ "rocksdb get failed for batch_get from {}: error = {},
key size = {}",
+ replica_name(),
+ rpc.remote_address().to_string(),
+ status.ToString().c_str(),
Review comment:
same, seem no need `c_str()`
##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,123 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+ dassert(_is_open, "");
+ _pfc_batch_get_qps->increment();
+ int64_t start_time = dsn_now_ns();
+
+ auto &response = rpc.response();
+ response.app_id = _gpid.get_app_id();
+ response.partition_index = _gpid.get_partition_index();
+ response.server = _primary_address;
+
+ const auto &request = rpc.request();
+ if (request.keys.empty()) {
+ response.error = rocksdb::Status::kInvalidArgument;
+ derror_replica("Invalid argument for batch_get from {}: 'keys' field
in request is empty",
+ rpc.remote_address().to_string());
+ _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error,
response.data);
+ _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+ return;
+ }
+
+ std::vector<rocksdb::Slice> keys;
+ keys.reserve(request.keys.size());
+ std::vector<::dsn::blob> keys_holder;
+ keys_holder.reserve(request.keys.size());
+ for (auto &key : request.keys) {
+ ::dsn::blob raw_key;
+ pegasus_generate_key(raw_key, key.hash_key, key.sort_key);
+ keys.emplace_back(rocksdb::Slice(raw_key.data(), raw_key.length()));
+ keys_holder.emplace_back(std::move(raw_key));
+ }
+
+ rocksdb::Status final_status;
+ bool error_occurred = false;
+ int64_t total_data_size = 0;
+ uint32_t epoch_now = ::pegasus::utils::epoch_now();
+ std::vector<std::string> values;
+ std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts,
keys, &values);
+ response.data.reserve(request.keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ const auto &status = statuses[i];
+ const ::dsn::blob &hash_key = request.keys[i].hash_key;
+ const ::dsn::blob &sort_key = request.keys[i].sort_key;
+ ::dsn::apps::full_data r;
+ r.hash_key = hash_key;
+ r.sort_key = sort_key;
+ r.exists = false;
+ response.data.emplace_back(std::move(r));
+ ::dsn::apps::full_data ¤t_data = response.data.back();
+
+ if (status.IsNotFound()) {
+ current_data.value = ::dsn::blob();
+ current_data.exists = false;
+ continue;
+ }
+
+ std::string &value = values[i];
+
+ if (dsn_likely(status.ok())) {
+ if (check_if_record_expired(epoch_now, value)) {
+ if (_verbose_log) {
+ derror_replica("rocksdb data expired for batch_get from
{}",
+ rpc.remote_address().to_string());
+ }
+
+ current_data.value = ::dsn::blob();
+ current_data.exists = false;
+ continue;
+ }
+
+ ::dsn::blob real_value;
+ pegasus_extract_user_data(_pegasus_data_version, std::move(value),
real_value);
+ current_data.value = std::move(real_value);
+ current_data.exists = true;
Review comment:
comment as above, I think `exists` flag is redundant
##########
File path: src/server/pegasus_server_impl.cpp
##########
@@ -776,6 +776,123 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
+void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
+{
+ dassert(_is_open, "");
+ _pfc_batch_get_qps->increment();
+ int64_t start_time = dsn_now_ns();
+
+ auto &response = rpc.response();
+ response.app_id = _gpid.get_app_id();
+ response.partition_index = _gpid.get_partition_index();
+ response.server = _primary_address;
+
+ const auto &request = rpc.request();
+ if (request.keys.empty()) {
+ response.error = rocksdb::Status::kInvalidArgument;
+ derror_replica("Invalid argument for batch_get from {}: 'keys' field
in request is empty",
+ rpc.remote_address().to_string());
+ _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error,
response.data);
+ _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
+ return;
+ }
+
+ std::vector<rocksdb::Slice> keys;
+ keys.reserve(request.keys.size());
+ std::vector<::dsn::blob> keys_holder;
+ keys_holder.reserve(request.keys.size());
+ for (auto &key : request.keys) {
Review comment:
const auto?
##########
File path: src/server/capacity_unit_calculator.cpp
##########
@@ -175,6 +179,27 @@ void
capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
}
+void capacity_unit_calculator::add_batch_get_cu(dsn::message_ex *req,
+ int32_t status,
+ const
std::vector<::dsn::apps::full_data> &datas)
+{
+ int64_t data_size = 0;
+ for (const auto &data : datas) {
+ data_size += data.value.size();
Review comment:
`cu` contains `key size`, like get or multi_get
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]