This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new f8dcf3a Upgrade C++ to protobuf 3.12
f8dcf3a is described below
commit f8dcf3a14ac5f07babdad334f4e6307c851d9b81
Author: Todd Lipcon <[email protected]>
AuthorDate: Wed Jul 1 16:18:02 2020 -0700
Upgrade C++ to protobuf 3.12
This has various performance improvements, and also enables Arena
support by default (used in a follow-on commit)
Change-Id: Ifd5e6bbdd2c517b1af2ff833701983315111bba4
Reviewed-on: http://gerrit.cloudera.org:8080/16135
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Kudu Jenkins
---
build-support/ubsan-blacklist.txt | 4 ++
src/kudu/cfile/bloomfile.cc | 2 +-
src/kudu/cfile/cfile_reader.cc | 4 +-
src/kudu/cfile/cfile_writer.cc | 2 +-
src/kudu/consensus/consensus_queue-test.cc | 3 +-
src/kudu/consensus/consensus_queue.cc | 2 +-
src/kudu/consensus/log-test-base.h | 10 ++--
src/kudu/consensus/log-test.cc | 9 ++--
src/kudu/consensus/log.cc | 3 +-
src/kudu/consensus/log.h | 2 +-
src/kudu/consensus/log_cache.cc | 15 +++---
src/kudu/consensus/log_cache.h | 11 ++--
src/kudu/consensus/log_reader.cc | 5 +-
src/kudu/consensus/log_util.cc | 6 +--
src/kudu/rpc/exactly_once_rpc-test.cc | 4 +-
src/kudu/rpc/inbound_call.cc | 4 +-
src/kudu/rpc/outbound_call.cc | 6 ++-
src/kudu/rpc/result_tracker.h | 2 +-
src/kudu/rpc/serialization.cc | 8 +--
src/kudu/rpc/serialization.h | 2 +-
src/kudu/tablet/ops/op_tracker.cc | 2 +-
src/kudu/tablet/tablet_metadata-test.cc | 2 +-
src/kudu/tools/tool_action_pbc.cc | 6 ++-
src/kudu/tools/tool_action_table.cc | 7 ++-
.../tserver/tablet_copy_source_session-test.cc | 5 +-
src/kudu/util/pb_util.cc | 61 ++++++++++++++--------
src/kudu/util/protobuf_util.h | 17 +++---
thirdparty/vars.sh | 2 +-
28 files changed, 122 insertions(+), 84 deletions(-)
diff --git a/build-support/ubsan-blacklist.txt
b/build-support/ubsan-blacklist.txt
index 873c4cd..eb544d0 100644
--- a/build-support/ubsan-blacklist.txt
+++ b/build-support/ubsan-blacklist.txt
@@ -42,8 +42,12 @@ src:*/boost/functional/hash/hash.hpp
# Protobuf's coded output stream and hash implementations have
unsigned-integer overflows:
# include/google/protobuf/io/coded_stream.h:925:61: runtime error: unsigned
integer overflow: 0 - 1 cannot be represented in type 'unsigned int'
# include/google/protobuf/stubs/hash.h:353:18: runtime error: unsigned integer
overflow: 5 * 5697019175939388048 cannot be represented in type 'unsigned long'
+# include/google/protobuf/map.h:951:21: runtime error: unsigned integer
overflow: 11400714819323198485 * 6142505534017268963 cannot be represented in
type 'unsigned long'
+# include/google/protobuf/parse_context.h:191:57: runtime error: unsigned
integer overflow: 0 - 1 cannot be represented in type 'unsigned int'
src:*/google/protobuf/io/coded_stream.h
src:*/google/protobuf/stubs/hash.h
+src:*/google/protobuf/map.h
+src:*/google/protobuf/parse_context.h
# Sparsepp has an unsigned-integer negation overflow:
# /include/sparsepp/spp.h:1196:20: runtime error: negation of 8192 cannot be
represented in type 'group_bm_type' (aka 'unsigned int')
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index d541f87..b33fcab 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -170,7 +170,7 @@ Status BloomFileWriter::FinishCurrentBloomBlock() {
BloomBlockHeaderPB hdr;
hdr.set_num_hash_functions(bloom_builder_.n_hashes());
faststring hdr_str;
- PutFixed32(&hdr_str, hdr.ByteSize());
+ PutFixed32(&hdr_str, static_cast<uint32_t>(hdr.ByteSizeLong()));
pb_util::AppendToString(hdr, &hdr_str);
// The data is the concatenation of the header and the bloom itself.
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index d4caa36..5473b4c 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -605,10 +605,10 @@ size_t CFileReader::memory_footprint() const {
// the size of base objects (recursively too), thus not accounting for
// malloc "slop".
if (header_) {
- size += header_->SpaceUsed();
+ size += header_->SpaceUsedLong();
}
if (footer_) {
- size += footer_->SpaceUsed();
+ size += footer_->SpaceUsedLong();
}
return size;
}
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index ab09ea3..033dfcb 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -157,7 +157,7 @@ Status CFileWriter::Start() {
CFileHeaderPB header;
FlushMetadataToPB(header.mutable_metadata());
- uint32_t pb_size = header.ByteSize();
+ uint32_t pb_size = header.ByteSizeLong();
faststring header_str;
// First the magic.
diff --git a/src/kudu/consensus/consensus_queue-test.cc
b/src/kudu/consensus/consensus_queue-test.cc
index cf6fc68..8c56737 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -429,7 +429,8 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
// Save the current flag state.
google::FlagSaver saver;
- FLAGS_consensus_max_batch_size_bytes = page_size_estimator.ByteSize();
+ FLAGS_consensus_max_batch_size_bytes =
+ static_cast<int32_t>(page_size_estimator.ByteSizeLong());
ConsensusRequestPB request;
ConsensusResponsePB response;
diff --git a/src/kudu/consensus/consensus_queue.cc
b/src/kudu/consensus/consensus_queue.cc
index 9d2b6f7..18c64d6 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -693,7 +693,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
// The batch of messages to send to the peer.
vector<ReplicateRefPtr> messages;
- int max_batch_size = FLAGS_consensus_max_batch_size_bytes -
request->ByteSize();
+ int64_t max_batch_size = FLAGS_consensus_max_batch_size_bytes -
request->ByteSizeLong();
// We try to get the follower's next_index from our log.
Status s = log_cache_.ReadOps(peer_copy.next_index - 1,
diff --git a/src/kudu/consensus/log-test-base.h
b/src/kudu/consensus/log-test-base.h
index 2041e81..0b412f7 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -70,7 +70,7 @@ inline Status AppendNoOpsToLogSync(clock::Clock* clock,
Log* log,
consensus::OpId* op_id,
int count,
- int* size = nullptr) {
+ size_t* size = nullptr) {
std::vector<consensus::ReplicateRefPtr> replicates;
for (int i = 0; i < count; i++) {
@@ -88,7 +88,7 @@ inline Status AppendNoOpsToLogSync(clock::Clock* clock,
if (size) {
// If we're tracking the sizes we need to account for the fact that the
Log wraps the
// log entry in an LogEntryBatchPB, and each actual entry will have a
one-byte tag.
- *size += repl->ByteSize() + 1;
+ *size += repl->ByteSizeLong() + 1;
}
replicates.push_back(replicate);
}
@@ -107,7 +107,7 @@ inline Status AppendNoOpsToLogSync(clock::Clock* clock,
inline Status AppendNoOpToLogSync(clock::Clock* clock,
Log* log,
consensus::OpId* op_id,
- int* size = nullptr) {
+ size_t* size = nullptr) {
return AppendNoOpsToLogSync(clock, log, op_id, 1, size);
}
@@ -339,7 +339,7 @@ class LogTestBase : public KuduTest {
// Append a single NO_OP entry. Increments op_id by one.
// If non-NULL, and if the write is successful, 'size' is incremented
// by the size of the written operation.
- Status AppendNoOp(consensus::OpId* op_id, int* size = nullptr) {
+ Status AppendNoOp(consensus::OpId* op_id, size_t* size = nullptr) {
return AppendNoOpToLogSync(clock_.get(), log_.get(), op_id, size);
}
@@ -347,7 +347,7 @@ class LogTestBase : public KuduTest {
// Increments op_id's index by the number of records written.
// If non-NULL, 'size' keeps track of the size of the operations
// successfully written.
- Status AppendNoOps(consensus::OpId* op_id, int num, int* size = nullptr) {
+ Status AppendNoOps(consensus::OpId* op_id, int num, size_t* size = nullptr) {
for (int i = 0; i < num; i++) {
RETURN_NOT_OK(AppendNoOp(op_id, size));
}
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 2efe492..3750a35 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -18,6 +18,7 @@
#include "kudu/consensus/log.h"
#include <algorithm>
+#include <cstddef>
#include <cerrno>
#include <cstdint>
#include <functional>
@@ -467,9 +468,9 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment)
{
repl->set_timestamp(0L);
// Entries are prefixed with a header.
- int64_t single_entry_size = batch.ByteSize() + kEntryHeaderSizeV2;
+ int64_t single_entry_size = batch.ByteSizeLong() + kEntryHeaderSizeV2;
- int written_entries_size = header_size;
+ size_t written_entries_size = header_size;
ASSERT_OK(AppendNoOps(&op_id, kNumEntries, &written_entries_size));
ASSERT_EQ(single_entry_size * kNumEntries + header_size,
written_entries_size);
ASSERT_EQ(written_entries_size,
log_->segment_allocator_.active_segment_->written_offset());
@@ -1015,13 +1016,13 @@ TEST_P(LogTestOptionalCompression,
TestReadLogWithReplacedReplicates) {
ElementDeleter d(&repls);
ASSERT_OK(reader->ReadReplicatesInRange(start_index, end_index,
size_limit, &repls));
ASSERT_LE(repls.size(), end_index - start_index + 1);
- int total_size = 0;
+ size_t total_size = 0;
int expected_index = start_index;
for (const ReplicateMsg* repl : repls) {
ASSERT_EQ(expected_index, repl->id().index());
ASSERT_EQ(terms_by_index[expected_index], repl->id().term());
expected_index++;
- total_size += repl->SpaceUsed();
+ total_size += repl->SpaceUsedLong();
}
if (total_size > size_limit) {
ASSERT_EQ(1, repls.size());
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index d4f5fb2..67c26d2 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -23,7 +23,6 @@
#include <memory>
#include <mutex>
#include <ostream>
-#include <type_traits>
#include <utility>
#include <boost/range/adaptor/reversed.hpp>
@@ -1240,7 +1239,7 @@ LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
entry_batch_pb_(std::move(entry_batch_pb)),
total_size_bytes_(
PREDICT_FALSE(count == 1 && entry_batch_pb_.entry(0).type() ==
FLUSH_MARKER)
- ? 0 : entry_batch_pb_.ByteSize()),
+ ? 0 : entry_batch_pb_.ByteSizeLong()),
count_(count),
callback_(std::move(cb)) {
}
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 556a99c..9c6484d 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -615,7 +615,7 @@ class LogEntryBatch {
LogEntryBatchPB entry_batch_pb_;
// Total size in bytes of all entries
- const uint32_t total_size_bytes_;
+ const size_t total_size_bytes_;
// Number of entries in 'entry_batch_pb_'
const size_t count_;
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 78220d1..67cf6f4 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -28,7 +28,6 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/wire_format_lite.h>
-#include <google/protobuf/wire_format_lite_inl.h>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
@@ -110,7 +109,7 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>&
metric_entity,
// code paths elsewhere.
auto zero_op = new ReplicateMsg();
*zero_op->mutable_id() = MinimumOpId();
- InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op),
zero_op->SpaceUsed() });
+ InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op),
zero_op->SpaceUsedLong() });
}
LogCache::~LogCache() {
@@ -158,7 +157,7 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr>
msgs,
vector<CacheEntry> entries_to_insert;
entries_to_insert.reserve(msgs.size());
for (const auto& msg : msgs) {
- CacheEntry e = { msg, static_cast<int64_t>(msg->get()->SpaceUsedLong()) };
+ CacheEntry e = { msg, msg->get()->SpaceUsedLong() };
mem_required += e.mem_usage;
entries_to_insert.emplace_back(std::move(e));
}
@@ -283,15 +282,15 @@ namespace {
// this message as part of a consensus update request. This accounts for the
// length delimiting and tagging of the message.
int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) {
- int msg_size =
google::protobuf::internal::WireFormatLite::LengthDelimitedSize(
- msg.ByteSize());
+ int64_t msg_size =
google::protobuf::internal::WireFormatLite::LengthDelimitedSize(
+ msg.ByteSizeLong());
msg_size += 1; // for the type tag
return msg_size;
}
} // anonymous namespace
Status LogCache::ReadOps(int64_t after_op_index,
- int max_size_bytes,
+ int64_t max_size_bytes,
std::vector<ReplicateRefPtr>* messages,
OpId* preceding_op) {
DCHECK_GE(after_op_index, 0);
@@ -470,7 +469,7 @@ void LogCache::DumpToStrings(vector<string>* lines) const {
Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
counter++, msg->id().term(), msg->id().index(),
OperationType_Name(msg->op_type()),
- msg->ByteSize()));
+ msg->ByteSizeLong()));
}
}
@@ -489,7 +488,7 @@ void LogCache::DumpToHtml(std::ostream& out) const {
"<td>$4</td><td>$5</td></tr>",
counter++, msg->id().term(), msg->id().index(),
OperationType_Name(msg->op_type()),
- msg->ByteSize(), SecureShortDebugString(msg->id())) <<
endl;
+ msg->ByteSizeLong(), SecureShortDebugString(msg->id()))
<< endl;
}
out << "</table>";
}
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index 93b35d1..9091308 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -17,6 +17,7 @@
#ifndef KUDU_CONSENSUS_LOG_CACHE_H
#define KUDU_CONSENSUS_LOG_CACHE_H
+#include <cstddef>
#include <cstdint>
#include <iosfwd>
#include <map>
@@ -72,9 +73,9 @@ class LogCache {
// If such an op exists in the log, an OK result will always include at
least one
// operation.
//
- // The result will be limited such that the total ByteSize() of the returned
ops
- // is less than max_size_bytes, unless that would result in an empty result,
in
- // which case exactly one op is returned.
+ // The result will be limited such that the total ByteSizeLong() of the
+ // returned ops is less than max_size_bytes, unless that would result in an
+ // empty result, in which case exactly one op is returned.
//
// The OpId which precedes the returned ops is returned in *preceding_op.
// The index of this OpId will match 'after_op_index'.
@@ -83,7 +84,7 @@ class LogCache {
// read these ops from disk. Therefore, this function may take a substantial
amount
// of time and should not be called with important locks held, etc.
Status ReadOps(int64_t after_op_index,
- int max_size_bytes,
+ int64_t max_size_bytes,
std::vector<ReplicateRefPtr>* messages,
OpId* preceding_op);
@@ -154,7 +155,7 @@ class LogCache {
ReplicateRefPtr msg;
// The cached value of msg->SpaceUsedLong(). This method is expensive
// to compute, so we compute it only once upon insertion.
- int64_t mem_usage;
+ size_t mem_usage;
};
// Try to evict the oldest operations from the queue, stopping either when
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 10ee0d0..24a4739 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -18,6 +18,7 @@
#include "kudu/consensus/log_reader.h"
#include <algorithm>
+#include <cstddef>
#include <memory>
#include <mutex>
#include <ostream>
@@ -288,7 +289,7 @@ Status LogReader::ReadReplicatesInRange(int64_t starting_at,
ElementDeleter d(&replicates_tmp);
LogIndexEntry prev_index_entry;
- int64_t total_size = 0;
+ size_t total_size = 0;
bool limit_exceeded = false;
faststring tmp_buf;
LogEntryBatchPB batch;
@@ -331,7 +332,7 @@ Status LogReader::ReadReplicatesInRange(int64_t starting_at,
continue;
}
- int64_t space_required = entry->replicate().SpaceUsed();
+ size_t space_required = entry->replicate().SpaceUsedLong();
if (replicates_tmp.empty() ||
max_bytes_to_read <= 0 ||
total_size + space_required < max_bytes_to_read) {
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 810f369..08bab3a 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -121,7 +121,7 @@ LogEntryReader::LogEntryReader(const ReadableLogSegment*
seg)
// If we have a footer we only read up to it. If we don't we likely crashed
// and always read to the end.
read_up_to_ = (seg_->footer_.IsInitialized() && !seg_->footer_was_rebuilt_) ?
- seg_->file_size() - seg_->footer_.ByteSize() -
kLogSegmentFooterMagicAndFooterLength :
+ seg_->file_size() - seg_->footer_.ByteSizeLong() -
kLogSegmentFooterMagicAndFooterLength :
readable_to_offset;
VLOG(1) << "Reading segment entries from "
<< seg_->path_ << ": offset=" << offset_ << " file_size="
@@ -782,7 +782,7 @@ Status WritableLogSegment::WriteHeader(const
LogSegmentHeaderPB& new_header) {
// First the magic.
buf.append(kLogSegmentHeaderMagicString);
// Then Length-prefixed header.
- PutFixed32(&buf, new_header.ByteSize());
+ PutFixed32(&buf, static_cast<uint32_t>(new_header.ByteSizeLong()));
// Then Serialize the PB.
pb_util::AppendToString(new_header, &buf);
RETURN_NOT_OK(file_->Write(0, Slice(buf)));
@@ -804,7 +804,7 @@ Status WritableLogSegment::WriteFooter(const
LogSegmentFooterPB& footer) {
faststring buf;
pb_util::AppendToString(footer, &buf);
buf.append(kLogSegmentFooterMagicString);
- PutFixed32(&buf, footer.ByteSize());
+ PutFixed32(&buf, static_cast<uint32_t>(footer.ByteSizeLong()));
RETURN_NOT_OK_PREPEND(file_->Write(written_offset_, Slice(buf)),
"Could not write the footer");
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc
b/src/kudu/rpc/exactly_once_rpc-test.cc
index 5a93e3a..b2841af 100644
--- a/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -399,9 +399,9 @@ TEST_F(ExactlyOnceRpcTest,
TestExactlyOnceSemanticsAfterRpcCompleted) {
// The incremental usage of a new client is the size of the response itself
// plus some fixed overhead for the client-tracking structure.
- int expected_incremental_usage = original_resp.SpaceUsed() + 200;
+ size_t expected_incremental_usage = original_resp.SpaceUsedLong() + 200;
- int mem_consumption_after = mem_tracker_->consumption();
+ size_t mem_consumption_after = mem_tracker_->consumption();
ASSERT_GT(mem_consumption_after - mem_consumption,
expected_incremental_usage);
mem_consumption = mem_consumption_after;
}
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 179a1d3..1b4c82b 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -18,6 +18,7 @@
#include "kudu/rpc/inbound_call.h"
#include <cstdint>
+#include <limits>
#include <memory>
#include <ostream>
@@ -178,7 +179,8 @@ void InboundCall::SerializeResponseBuffer(const
MessageLite& response,
// happened.
}
- uint32_t protobuf_msg_size = response.ByteSize();
+ size_t protobuf_msg_size = response.ByteSizeLong();
+ CHECK_LE(protobuf_msg_size, std::numeric_limits<uint32_t>::max());
ResponseHeader resp_hdr;
resp_hdr.set_call_id(header_.call_id());
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 66557b9..e2f1fd3 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -19,6 +19,8 @@
#include <cstddef>
#include <cstdint>
+#include <functional>
+#include <limits>
#include <memory>
#include <mutex>
#include <string>
@@ -27,7 +29,6 @@
#include <vector>
#include <boost/container/vector.hpp>
-#include <boost/function.hpp>
#include <gflags/gflags.h>
#include <google/protobuf/message.h>
@@ -145,7 +146,8 @@ void OutboundCall::SetRequestPayload(const Message& req,
// Compute total size of sidecar payload so that extra space can be reserved
as part of
// the request body.
- uint32_t message_size = req.ByteSize();
+ size_t message_size = req.ByteSizeLong();
+ CHECK_LE(message_size, std::numeric_limits<uint32_t>::max());
sidecar_byte_size_ = 0;
for (const unique_ptr<RpcSidecar>& car: sidecars_) {
header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
index e275e37..79d43c1 100644
--- a/src/kudu/rpc/result_tracker.h
+++ b/src/kudu/rpc/result_tracker.h
@@ -283,7 +283,7 @@ class ResultTracker : public
RefCountedThreadSafe<ResultTracker> {
int64_t memory_footprint() const {
return kudu_malloc_usable_size(this)
+ (ongoing_rpcs.capacity() > 0 ?
kudu_malloc_usable_size(ongoing_rpcs.data()) : 0)
- + (response.get() != nullptr ? response->SpaceUsed() : 0);
+ + (response ? response->SpaceUsedLong() : 0);
}
};
diff --git a/src/kudu/rpc/serialization.cc b/src/kudu/rpc/serialization.cc
index 7551465..b845407 100644
--- a/src/kudu/rpc/serialization.cc
+++ b/src/kudu/rpc/serialization.cc
@@ -55,8 +55,8 @@ enum {
void SerializeMessage(const MessageLite& message, faststring* param_buf,
int additional_size, bool use_cached_size) {
DCHECK_GE(additional_size, 0);
- int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize();
- DCHECK_EQ(message.ByteSize(), pb_size);
+ size_t pb_size = use_cached_size ? message.GetCachedSize() :
message.ByteSizeLong();
+ DCHECK_EQ(message.ByteSizeLong(), pb_size);
// Use 8-byte integers to avoid overflowing when additional_size approaches
INT_MAX.
int64_t recorded_size = static_cast<int64_t>(pb_size) +
static_cast<int64_t>(additional_size);
@@ -90,7 +90,7 @@ void SerializeHeader(const MessageLite& header,
<< "RPC header missing fields: " << header.InitializationErrorString();
// Compute all the lengths for the packet.
- size_t header_pb_len = header.ByteSize();
+ size_t header_pb_len = header.ByteSizeLong();
size_t header_tot_len = kMsgLengthPrefixLength // Int prefix for the
total length.
+ CodedOutputStream::VarintSize32(header_pb_len) // Varint delimiter
for header PB.
+ header_pb_len; // Length for the
header PB itself.
@@ -136,7 +136,7 @@ Status ParseMessage(const Slice& buf,
// Protobuf enforces a 64MB total bytes limit on CodedInputStream by default.
// Override this default with the actual size of the buffer to allow messages
// larger than 64MB.
- in.SetTotalBytesLimit(buf.size(), -1);
+ in.SetTotalBytesLimit(buf.size());
in.Skip(kMsgLengthPrefixLength);
uint32_t header_len;
diff --git a/src/kudu/rpc/serialization.h b/src/kudu/rpc/serialization.h
index 9fa3858..b1d7c84 100644
--- a/src/kudu/rpc/serialization.h
+++ b/src/kudu/rpc/serialization.h
@@ -43,7 +43,7 @@ namespace serialization {
// the protobuf itself).
// 'use_cached_size' Additional optional argument whether to use the
cached
// or explicit byte size by calling MessageLite::GetCachedSize() or
-// MessageLite::ByteSize(), respectively.
+// MessageLite::ByteSizeLong(), respectively.
// Out: The faststring 'param_buf' to be populated with the serialized bytes.
// The faststring's length is only determined by the amount that
// needs to be serialized for the protobuf (i.e., no additional space
diff --git a/src/kudu/tablet/ops/op_tracker.cc
b/src/kudu/tablet/ops/op_tracker.cc
index 97ea0fd..98153a2 100644
--- a/src/kudu/tablet/ops/op_tracker.cc
+++ b/src/kudu/tablet/ops/op_tracker.cc
@@ -145,7 +145,7 @@ OpTracker::~OpTracker() {
}
Status OpTracker::Add(OpDriver* driver) {
- int64_t driver_mem_footprint = driver->state()->request()->SpaceUsed();
+ size_t driver_mem_footprint = driver->state()->request()->SpaceUsedLong();
if (mem_tracker_ && !mem_tracker_->TryConsume(driver_mem_footprint)) {
if (metrics_) {
metrics_->transaction_memory_pressure_rejections->Increment();
diff --git a/src/kudu/tablet/tablet_metadata-test.cc
b/src/kudu/tablet/tablet_metadata-test.cc
index 7ef0433..4dd511f 100644
--- a/src/kudu/tablet/tablet_metadata-test.cc
+++ b/src/kudu/tablet/tablet_metadata-test.cc
@@ -192,7 +192,7 @@ TEST_F(TestTabletMetadata, TestOnDiskSize) {
// so the on_disk_size should be at least as big.
TabletSuperBlockPB superblock_pb;
ASSERT_OK(meta->ToSuperBlock(&superblock_pb));
- ASSERT_GE(final_size, superblock_pb.ByteSize());
+ ASSERT_GE(final_size, superblock_pb.ByteSizeLong());
}
TEST_F(TestTabletMetadata, BenchmarkCollectBlockIds) {
diff --git a/src/kudu/tools/tool_action_pbc.cc
b/src/kudu/tools/tool_action_pbc.cc
index 2ce652a..ecb3926 100644
--- a/src/kudu/tools/tool_action_pbc.cc
+++ b/src/kudu/tools/tool_action_pbc.cc
@@ -47,6 +47,8 @@
#include "kudu/util/status.h"
#include "kudu/util/subprocess.h"
+using google::protobuf::util::JsonParseOptions;
+using google::protobuf::util::JsonStringToMessage;
using std::cout;
using std::string;
using std::unique_ptr;
@@ -198,9 +200,11 @@ Status EditFile(const RunnerContext& context) {
unique_ptr<google::protobuf::Message> m(prototype->New());
vector<string> lines;
RETURN_NOT_OK(LoadFileToLines(tmp_json_path, &lines));
+ JsonParseOptions opts;
+ opts.case_insensitive_enum_parsing = true;
for (const string& l : lines) {
m->Clear();
- const auto& google_status =
google::protobuf::util::JsonStringToMessage(l, m.get());
+ const auto& google_status = JsonStringToMessage(l, m.get(), opts);
if (!google_status.ok()) {
return Status::InvalidArgument(
Substitute("Unable to parse JSON line: $0", l),
diff --git a/src/kudu/tools/tool_action_table.cc
b/src/kudu/tools/tool_action_table.cc
index 006cc71..ff09039 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -61,6 +61,8 @@
#include "kudu/util/jsonreader.h"
#include "kudu/util/status.h"
+using google::protobuf::util::JsonStringToMessage;
+using google::protobuf::util::JsonParseOptions;
using google::protobuf::RepeatedPtrField;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
@@ -1136,8 +1138,9 @@ Status ParseTablePartition(const PartitionPB& partition,
Status CreateTable(const RunnerContext& context) {
const string& json_str = FindOrDie(context.required_args,
kCreateTableJSONArg);
CreateTablePB table_req;
- const auto& google_status =
- google::protobuf::util::JsonStringToMessage(json_str, &table_req);
+ JsonParseOptions opts;
+ opts.case_insensitive_enum_parsing = true;
+ const auto& google_status = JsonStringToMessage(json_str, &table_req, opts);
if (!google_status.ok()) {
return Status::InvalidArgument(
Substitute("unable to parse JSON: $0", json_str),
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc
b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 82f271a..27f3daa 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -17,6 +17,7 @@
#include "kudu/tserver/tablet_copy_source_session.h"
+#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
@@ -279,7 +280,7 @@ TEST_F(TabletCopyTest, TestSuperBlocksEqual) {
{
const TabletSuperBlockPB& session_superblock =
session_->tablet_superblock();
- int size = session_superblock.ByteSize();
+ size_t size = session_superblock.ByteSizeLong();
session_buf.resize(size);
uint8_t* session_dst = session_buf.data();
session_superblock.SerializeWithCachedSizesToArray(session_dst);
@@ -288,7 +289,7 @@ TEST_F(TabletCopyTest, TestSuperBlocksEqual) {
{
TabletSuperBlockPB tablet_superblock;
ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
- int size = tablet_superblock.ByteSize();
+ size_t size = tablet_superblock.ByteSizeLong();
tablet_buf.resize(size);
uint8_t* tablet_dst = tablet_buf.data();
tablet_superblock.SerializeWithCachedSizesToArray(tablet_dst);
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index 6237c80..f3855f0 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -27,6 +27,7 @@
#include <cstddef>
#include <deque>
#include <initializer_list>
+#include <limits>
#include <memory>
#include <mutex>
#include <ostream>
@@ -140,9 +141,9 @@ namespace {
// protobuf implementation but is more likely caused by concurrent modification
// of the message. This function attempts to distinguish between the two and
// provide a useful error message.
-void ByteSizeConsistencyError(int byte_size_before_serialization,
- int byte_size_after_serialization,
- int bytes_produced_by_serialization) {
+void ByteSizeConsistencyError(size_t byte_size_before_serialization,
+ size_t byte_size_after_serialization,
+ size_t bytes_produced_by_serialization) {
CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization)
<< "Protocol message was modified concurrently during serialization.";
CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization)
@@ -365,7 +366,7 @@ Status ReadPBStartingAt(ReadableFileType* reader, int
version,
// integer overflow warnings, so that's what we'll use.
ArrayInputStream ais(body.data(), body.size());
CodedInputStream cis(&ais);
- cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+ cis.SetTotalBytesLimit(512 * 1024 * 1024);
if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
return Status::IOError("Unable to parse PB from path", reader->filename());
}
@@ -471,16 +472,16 @@ void AppendToString(const MessageLite &msg, faststring
*output) {
void AppendPartialToString(const MessageLite &msg, faststring* output) {
size_t old_size = output->size();
- int byte_size = msg.ByteSize();
+ size_t byte_size = msg.ByteSizeLong();
// Messages >2G cannot be serialized due to overflow computing ByteSize.
DCHECK_GE(byte_size, 0) << "Error computing ByteSize";
- output->resize(old_size + static_cast<size_t>(byte_size));
+ output->resize(old_size + byte_size);
uint8* start = &((*output)[old_size]);
uint8* end = msg.SerializeWithCachedSizesToArray(start);
if (end - start != byte_size) {
- ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
+ ByteSizeConsistencyError(byte_size, msg.ByteSizeLong(), end - start);
}
}
@@ -593,31 +594,44 @@ void TruncateFields(Message* message, int max_len) {
}
namespace {
-class SecureFieldPrinter : public TextFormat::FieldValuePrinter {
+class SecureFieldPrinter : public TextFormat::FastFieldValuePrinter {
public:
- using super = TextFormat::FieldValuePrinter;
+ using super = TextFormat::FastFieldValuePrinter;
+ using BaseTextGenerator = TextFormat::BaseTextGenerator;
- string PrintFieldName(const Message& message,
- const Reflection* reflection,
- const FieldDescriptor* field) const override {
+ void PrintFieldName(const Message& message,
+ const Reflection* reflection,
+ const FieldDescriptor* field,
+ BaseTextGenerator* generator) const override {
hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING &&
field->options().GetExtension(REDACT);
- return super::PrintFieldName(message, reflection, field);
+ super::PrintFieldName(message, reflection, field, generator);
}
- string PrintString(const string& val) const override {
+ void PrintFieldName(const Message& message, int field_index,
+ int field_count, const Reflection* reflection,
+ const FieldDescriptor* field,
+ BaseTextGenerator* generator) const override {
+ hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING &&
+ field->options().GetExtension(REDACT);
+ super::PrintFieldName(message, field_index, field_count, reflection, field,
+ generator);
+ }
+
+ void PrintString(const string& val, BaseTextGenerator* generator) const
override {
if (hide_next_string_) {
hide_next_string_ = false;
- return KUDU_REDACT(super::PrintString(val));
+ super::PrintString(KUDU_REDACT(val), generator);
+ return;
}
- return super::PrintString(val);
+ super::PrintString(val, generator);
}
- string PrintBytes(const string& val) const override {
+ void PrintBytes(const string& val, BaseTextGenerator* generator) const
override {
if (hide_next_string_) {
hide_next_string_ = false;
- return KUDU_REDACT(super::PrintBytes(val));
+ super::PrintBytes(KUDU_REDACT(val), generator);
}
- return super::PrintBytes(val);
+ super::PrintBytes(val, generator);
}
mutable bool hide_next_string_ = false;
@@ -774,9 +788,10 @@ const string& WritablePBContainerFile::filename() const {
Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg,
faststring* buf) {
DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
- int data_len = msg.ByteSize();
- // Messages >2G cannot be serialized due to overflow computing ByteSize.
- DCHECK_GE(data_len, 0) << "Error computing ByteSize";
+ size_t data_len_long = msg.ByteSizeLong();
+ // Messages >2G cannot be serialized due to format restrictions.
+ CHECK_LT(data_len_long, std::numeric_limits<int32_t>::max());
+ uint32_t data_len = static_cast<uint32_t>(data_len_long);
uint64_t record_buflen = sizeof(uint32_t) + data_len + sizeof(uint32_t);
if (version_ >= 2) {
record_buflen += sizeof(uint32_t); // Additional checksum just for the
length.
@@ -789,7 +804,7 @@ Status WritablePBContainerFile::AppendMsgToBuffer(const
Message& msg, faststring
// Serialize the data length.
size_t cur_offset = 0;
- InlineEncodeFixed32(dst + cur_offset, static_cast<uint32_t>(data_len));
+ InlineEncodeFixed32(dst + cur_offset, data_len);
cur_offset += sizeof(uint32_t);
// For version >= 2: Serialize the checksum of the data length.
diff --git a/src/kudu/util/protobuf_util.h b/src/kudu/util/protobuf_util.h
index cc88eda..6065f43 100644
--- a/src/kudu/util/protobuf_util.h
+++ b/src/kudu/util/protobuf_util.h
@@ -17,19 +17,24 @@
#ifndef KUDU_UTIL_PROTOBUF_UTIL_H
#define KUDU_UTIL_PROTOBUF_UTIL_H
+#include <cstdint>
+
+#include <glog/logging.h>
#include <google/protobuf/message_lite.h>
+#include "kudu/util/faststring.h"
+
namespace kudu {
-bool AppendPBToString(const google::protobuf::MessageLite &msg, faststring
*output) {
- int old_size = output->size();
- int byte_size = msg.ByteSize();
+inline bool AppendPBToString(const google::protobuf::MessageLite &msg,
faststring *output) {
+ size_t old_size = output->size();
+ size_t byte_size = msg.ByteSizeLong();
output->resize(old_size + byte_size);
- uint8* start = reinterpret_cast<uint8*>(output->data() + old_size);
- uint8* end = msg.SerializeWithCachedSizesToArray(start);
+ uint8_t* start = reinterpret_cast<uint8_t*>(output->data() + old_size);
+ uint8_t* end = msg.SerializeWithCachedSizesToArray(start);
CHECK(end - start == byte_size)
<< "Error in serialization. byte_size=" << byte_size
- << " new ByteSize()=" << msg.ByteSize()
+ << " new ByteSizeLong()=" << msg.ByteSizeLong()
<< " end-start=" << (end-start);
return true;
}
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index fd0ef23..80a244b 100644
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -54,7 +54,7 @@ GPERFTOOLS_VERSION=2.6.90
GPERFTOOLS_NAME=gperftools-$GPERFTOOLS_VERSION
GPERFTOOLS_SOURCE=$TP_SOURCE_DIR/$GPERFTOOLS_NAME
-PROTOBUF_VERSION=3.4.1
+PROTOBUF_VERSION=3.12.3
PROTOBUF_NAME=protobuf-$PROTOBUF_VERSION
PROTOBUF_SOURCE=$TP_SOURCE_DIR/$PROTOBUF_NAME