This is an automated email from the ASF dual-hosted git repository. yuchenhe pushed a commit to branch v2.3 in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 90fea54d1dd95f90c7a7ba510ac290dee538bf7b Author: cauchy1988 <[email protected]> AuthorDate: Wed Oct 13 18:04:27 2021 +0800 fix: `full_scan` can't scan data completely in some occassions (#825) --- src/base/rrdb_types.cpp | 32 ++++++++++++++++++++++++++++++-- src/client_lib/pegasus_client_impl.cpp | 5 +++-- src/client_lib/pegasus_client_impl.h | 7 +++++-- src/client_lib/pegasus_scanner_impl.cpp | 13 +++++++++---- src/idl/rrdb.thrift | 1 + src/include/rrdb/rrdb_types.h | 19 +++++++++++++++---- src/server/pegasus_server_impl.cpp | 2 +- 7 files changed, 64 insertions(+), 15 deletions(-) diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index 5e0885e..9eaa7e7 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -3712,6 +3712,12 @@ void get_scanner_request::__set_return_expire_ts(const bool val) __isset.return_expire_ts = true; } +void get_scanner_request::__set_full_scan(const bool val) +{ + this->full_scan = val; + __isset.full_scan = true; +} + uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -3831,6 +3837,14 @@ uint32_t get_scanner_request::read(::apache::thrift::protocol::TProtocol *iprot) xfer += iprot->skip(ftype); } break; + case 13: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->full_scan); + this->__isset.full_scan = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -3902,6 +3916,11 @@ uint32_t get_scanner_request::write(::apache::thrift::protocol::TProtocol *oprot xfer += oprot->writeBool(this->return_expire_ts); xfer += oprot->writeFieldEnd(); } + if (this->__isset.full_scan) { + xfer += oprot->writeFieldBegin("full_scan", ::apache::thrift::protocol::T_BOOL, 13); + xfer += oprot->writeBool(this->full_scan); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -3922,6 +3941,7 @@ void swap(get_scanner_request &a, get_scanner_request &b) swap(a.sort_key_filter_pattern, b.sort_key_filter_pattern); swap(a.validate_partition_hash, b.validate_partition_hash); swap(a.return_expire_ts, b.return_expire_ts); + swap(a.full_scan, b.full_scan); swap(a.__isset, b.__isset); } @@ -3939,6 +3959,7 @@ get_scanner_request::get_scanner_request(const get_scanner_request &other108) sort_key_filter_pattern = other108.sort_key_filter_pattern; validate_partition_hash = other108.validate_partition_hash; return_expire_ts = other108.return_expire_ts; + full_scan = other108.full_scan; __isset = other108.__isset; } get_scanner_request::get_scanner_request(get_scanner_request &&other109) @@ -3955,6 +3976,7 @@ get_scanner_request::get_scanner_request(get_scanner_request &&other109) sort_key_filter_pattern = std::move(other109.sort_key_filter_pattern); validate_partition_hash = std::move(other109.validate_partition_hash); return_expire_ts = std::move(other109.return_expire_ts); + full_scan = std::move(other109.full_scan); __isset = std::move(other109.__isset); } get_scanner_request &get_scanner_request::operator=(const get_scanner_request &other110) @@ -3971,6 +3993,7 @@ get_scanner_request &get_scanner_request::operator=(const get_scanner_request &o sort_key_filter_pattern = other110.sort_key_filter_pattern; validate_partition_hash = other110.validate_partition_hash; return_expire_ts = other110.return_expire_ts; + full_scan = other110.full_scan; __isset = other110.__isset; return *this; } @@ -3988,6 +4011,7 @@ get_scanner_request &get_scanner_request::operator=(get_scanner_request &&other1 sort_key_filter_pattern = std::move(other111.sort_key_filter_pattern); validate_partition_hash = std::move(other111.validate_partition_hash); return_expire_ts = std::move(other111.return_expire_ts); + full_scan = std::move(other111.full_scan); __isset = std::move(other111.__isset); return *this; } @@ -4021,6 +4045,9 @@ void get_scanner_request::printTo(std::ostream &out) const out << ", " << "return_expire_ts="; (__isset.return_expire_ts ? (out << to_string(return_expire_ts)) : (out << "<null>")); + out << ", " + << "full_scan="; + (__isset.full_scan ? (out << to_string(full_scan)) : (out << "<null>")); out << ")"; } @@ -4674,5 +4701,6 @@ void duplicate_response::printTo(std::ostream &out) const (__isset.error_hint ? (out << to_string(error_hint)) : (out << "<null>")); out << ")"; } -} -} // namespace + +} // namespace apps +} // namespace dsn diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp index 826bf4e..9e58942 100644 --- a/src/client_lib/pegasus_client_impl.cpp +++ b/src/client_lib/pegasus_client_impl.cpp @@ -1179,7 +1179,7 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key, if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) { v.push_back(pegasus_key_hash(start)); } - scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false); + scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false, false); return PERR_OK; } @@ -1223,7 +1223,8 @@ void pegasus_client_impl::async_get_unordered_scanners( std::vector<uint64_t> hash(s); for (int j = 0; j < s; j++) hash[j] = --count; - scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options, true); + scanners[i] = + new pegasus_scanner_impl(_client, std::move(hash), options, true, true); } } } diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h index be33e88..adbbc71 100644 --- a/src/client_lib/pegasus_client_impl.h +++ b/src/client_lib/pegasus_client_impl.h @@ -266,13 +266,15 @@ public: pegasus_scanner_impl(::dsn::apps::rrdb_client *client, std::vector<uint64_t> &&hash, const scan_options &options, - bool validate_partition_hash); + bool validate_partition_hash, + bool full_scan); pegasus_scanner_impl(::dsn::apps::rrdb_client *client, std::vector<uint64_t> &&hash, const scan_options &options, const ::dsn::blob &start_key, const ::dsn::blob &stop_key, - bool validate_partition_hash); + bool validate_partition_hash, + bool full_scan); private: ::dsn::apps::rrdb_client *_client; @@ -291,6 +293,7 @@ public: std::list<async_scan_next_callback_t> _queue; volatile bool _rpc_started; bool _validate_partition_hash; + bool _full_scan; void _async_next_internal(); void _start_scan(); diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp index 487bdd5..ce9baa5 100644 --- a/src/client_lib/pegasus_scanner_impl.cpp +++ b/src/client_lib/pegasus_scanner_impl.cpp @@ -29,8 +29,10 @@ namespace client { pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client, std::vector<uint64_t> &&hash, const scan_options &options, - bool validate_partition_hash) - : pegasus_scanner_impl(client, std::move(hash), options, _min, _max, validate_partition_hash) + bool validate_partition_hash, + bool full_scan) + : pegasus_scanner_impl( + client, std::move(hash), options, _min, _max, validate_partition_hash, full_scan) { _options.start_inclusive = true; _options.stop_inclusive = false; @@ -41,7 +43,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd const scan_options &options, const ::dsn::blob &start_key, const ::dsn::blob &stop_key, - bool validate_partition_hash) + bool validate_partition_hash, + bool full_scan) : _client(client), _start_key(start_key), _stop_key(stop_key), @@ -50,7 +53,8 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd _p(-1), _context(SCAN_CONTEXT_ID_COMPLETED), _rpc_started(false), - _validate_partition_hash(validate_partition_hash) + _validate_partition_hash(validate_partition_hash), + _full_scan(full_scan) { } @@ -225,6 +229,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan() req.no_value = _options.no_value; req.__set_validate_partition_hash(_validate_partition_hash); req.__set_return_expire_ts(_options.return_expire_ts); + req.__set_full_scan(_full_scan); dassert(!_rpc_started, ""); _rpc_started = true; diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 125efbb..b81483b 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -255,6 +255,7 @@ struct get_scanner_request 10:dsn.blob sort_key_filter_pattern; 11:optional bool validate_partition_hash; 12:optional bool return_expire_ts; + 13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise } struct scan_request diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 15f1730..205216a 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -1568,7 +1568,8 @@ typedef struct _get_scanner_request__isset sort_key_filter_type(false), sort_key_filter_pattern(false), validate_partition_hash(false), - return_expire_ts(false) + return_expire_ts(false), + full_scan(false) { } bool start_key : 1; @@ -1583,6 +1584,7 @@ typedef struct _get_scanner_request__isset bool sort_key_filter_pattern : 1; bool validate_partition_hash : 1; bool return_expire_ts : 1; + bool full_scan : 1; } _get_scanner_request__isset; class get_scanner_request @@ -1600,7 +1602,8 @@ public: hash_key_filter_type((filter_type::type)0), sort_key_filter_type((filter_type::type)0), validate_partition_hash(0), - return_expire_ts(0) + return_expire_ts(0), + full_scan(0) { } @@ -1617,6 +1620,7 @@ public: ::dsn::blob sort_key_filter_pattern; bool validate_partition_hash; bool return_expire_ts; + bool full_scan; _get_scanner_request__isset __isset; @@ -1644,6 +1648,8 @@ public: void __set_return_expire_ts(const bool val); + void __set_full_scan(const bool val); + bool operator==(const get_scanner_request &rhs) const { if (!(start_key == rhs.start_key)) @@ -1675,6 +1681,10 @@ public: return false; else if (__isset.return_expire_ts && !(return_expire_ts == rhs.return_expire_ts)) return false; + if (__isset.full_scan != rhs.__isset.full_scan) + return false; + else if (__isset.full_scan && !(full_scan == rhs.full_scan)) + return false; return true; } bool operator!=(const get_scanner_request &rhs) const { return !(*this == rhs); } @@ -1967,7 +1977,8 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj obj.printTo(out); return out; } -} -} // namespace + +} // namespace apps +} // namespace dsn #endif diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 9375bbe..34ef06e 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -945,7 +945,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) if (_data_cf_opts.prefix_extractor) { ::dsn::blob start_hash_key, tmp; pegasus_restore_key(request.start_key, start_hash_key, tmp); - if (start_hash_key.size() == 0) { + if (start_hash_key.size() == 0 || request.full_scan) { // hash_key is not passed, only happened when do full scan (scanners got by // get_unordered_scanners) on a partition, we have to do total order seek on rocksDB. rd_opts.total_order_seek = true; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
