Repository: kudu Updated Branches: refs/heads/master 6ed6989ab -> 5a46e6076
KUDU-78. Fix pb_util functions which return bool to return Status or void As suggested in ticket, I've refactored four pb_util functions returning bool. As it turns out, three of them should never fail (except with CHECKS), so I've changed these to return void and added one missing check. The return type change was propogated to callers as well: if any of them can't fail anymore it's also refactored to return void. The fourth one (ParseFromSequentialFile) can actually fail in two ways: either with IO error or with Protobuf error (Status::Corruption). To track which one, I had to add GetStatus() method into SequentialFileFileInputStream. Change-Id: Ib760793f6e6da3e357573e525f47b32c79472468 Reviewed-on: http://gerrit.cloudera.org:8080/4800 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5a46e607 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5a46e607 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5a46e607 Branch: refs/heads/master Commit: 5a46e6076d8b26971202d31ab04326339c961796 Parents: 6ed6989 Author: Maxim Smyatkin <[email protected]> Authored: Sat Oct 22 23:20:47 2016 +0300 Committer: Todd Lipcon <[email protected]> Committed: Tue Oct 25 22:50:44 2016 +0000 ---------------------------------------------------------------------- src/kudu/cfile/bloomfile.cc | 2 +- src/kudu/cfile/cfile_writer.cc | 9 +--- src/kudu/consensus/log.cc | 34 ++++++--------- src/kudu/consensus/log.h | 6 +-- src/kudu/consensus/log_util.cc | 10 +---- src/kudu/consensus/mt-log-test.cc | 2 +- src/kudu/master/sys_catalog.cc | 61 +++++++++------------------ src/kudu/master/sys_catalog.h | 18 ++++---- src/kudu/tablet/delta_compaction-test.cc | 2 +- src/kudu/tablet/delta_compaction.cc | 4 +- src/kudu/tablet/delta_store.cc | 2 +- src/kudu/tablet/deltafile-test.cc | 2 +- src/kudu/tablet/deltafile.cc | 8 +--- src/kudu/tablet/deltafile.h | 2 +- src/kudu/tablet/deltamemstore.cc | 2 +- src/kudu/util/pb_util-internal.h | 4 ++ src/kudu/util/pb_util.cc | 35 +++++++++------ src/kudu/util/pb_util.h | 9 ++-- 18 files changed, 87 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/cfile/bloomfile.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc index c4822f2..f49efab 100644 --- a/src/kudu/cfile/bloomfile.cc +++ b/src/kudu/cfile/bloomfile.cc @@ -113,7 +113,7 @@ Status BloomFileWriter::FinishCurrentBloomBlock() { hdr.set_num_hash_functions(bloom_builder_.n_hashes()); faststring hdr_str; PutFixed32(&hdr_str, hdr.ByteSize()); - CHECK(pb_util::AppendToString(hdr, &hdr_str)); + pb_util::AppendToString(hdr, &hdr_str); // The data is the concatenation of the header and the bloom itself. vector<Slice> slices; http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/cfile/cfile_writer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc index 4107656..091bf26 100644 --- a/src/kudu/cfile/cfile_writer.cc +++ b/src/kudu/cfile/cfile_writer.cc @@ -168,10 +168,7 @@ Status CFileWriter::Start() { buf.append(kMagicString); // Then Length-prefixed header. PutFixed32(&buf, pb_size); - if (!pb_util::AppendToString(header, &buf)) { - return Status::Corruption("unable to encode header"); - } - + pb_util::AppendToString(header, &buf); RETURN_NOT_OK_PREPEND(block_->Append(Slice(buf)), "Couldn't write header"); off_ += buf.size(); @@ -237,9 +234,7 @@ Status CFileWriter::FinishAndReleaseBlock(ScopedWritableBlockCloser* closer) { FlushMetadataToPB(footer.mutable_metadata()); faststring footer_str; - if (!pb_util::SerializeToString(footer, &footer_str)) { - return Status::Corruption("unable to serialize footer"); - } + pb_util::SerializeToString(footer, &footer_str); footer_str.append(kMagicString); PutFixed32(&footer_str, footer.GetCachedSize()); http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/consensus/log.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc index b77257a..4225c07 100644 --- a/src/kudu/consensus/log.cc +++ b/src/kudu/consensus/log.cc @@ -435,20 +435,18 @@ Status Log::Reserve(LogEntryTypePB type, return Status::OK(); } -Status Log::AsyncAppend(LogEntryBatch* entry_batch, const StatusCallback& callback) { +void Log::AsyncAppend(LogEntryBatch* entry_batch, const StatusCallback& callback) { TRACE_EVENT0("log", "Log::AsyncAppend"); { shared_lock<rw_spinlock> l(state_lock_.get_lock()); CHECK_EQ(kLogWriting, log_state_); } - RETURN_NOT_OK(entry_batch->Serialize()); + entry_batch->Serialize(); entry_batch->set_callback(callback); TRACE("Serialized $0 byte log entry", entry_batch->total_size_bytes()); TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch); entry_batch->MarkReady(); - - return Status::OK(); } Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates, @@ -463,7 +461,7 @@ Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates, // replicate while we're appending. reserved_entry_batch->SetReplicates(replicates); - RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, callback)); + AsyncAppend(reserved_entry_batch, callback); return Status::OK(); } @@ -479,7 +477,7 @@ Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg, LogEntryBatch* reserved_entry_batch; RETURN_NOT_OK(Reserve(COMMIT, std::move(batch), &reserved_entry_batch)); - RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, callback)); + AsyncAppend(reserved_entry_batch, callback); return Status::OK(); } @@ -669,13 +667,11 @@ Status Log::Append(LogEntryPB* entry) { entry_batch_pb->mutable_entry()->AddAllocated(entry); LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1); entry_batch.state_ = LogEntryBatch::kEntryReserved; - Status s = entry_batch.Serialize(); + entry_batch.Serialize(); + entry_batch.state_ = LogEntryBatch::kEntryReady; + Status s = DoAppend(&entry_batch); if (s.ok()) { - entry_batch.state_ = LogEntryBatch::kEntryReady; - s = DoAppend(&entry_batch); - if (s.ok()) { - s = Sync(); - } + s = Sync(); } entry_batch.entry_batch_pb_->mutable_entry()->ExtractSubrange(0, 1, nullptr); return s; @@ -689,7 +685,7 @@ Status Log::WaitUntilAllFlushed() { LogEntryBatch* reserved_entry_batch; RETURN_NOT_OK(Reserve(FLUSH_MARKER, std::move(entry_batch), &reserved_entry_batch)); Synchronizer s; - RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, s.AsStatusCallback())); + AsyncAppend(reserved_entry_batch, s.AsStatusCallback()); return s.Wait(); } @@ -1005,23 +1001,17 @@ void LogEntryBatch::MarkReserved() { state_ = kEntryReserved; } -Status LogEntryBatch::Serialize() { +void LogEntryBatch::Serialize() { DCHECK_EQ(state_, kEntryReserved); buffer_.clear(); // FLUSH_MARKER LogEntries are markers and are not serialized. if (PREDICT_FALSE(count() == 1 && entry_batch_pb_->entry(0).type() == FLUSH_MARKER)) { state_ = kEntrySerialized; - return Status::OK(); + return; } buffer_.reserve(total_size_bytes_); - - if (!pb_util::AppendToString(*entry_batch_pb_, &buffer_)) { - return Status::IOError(Substitute("unable to serialize the entry batch, contents: $1", - entry_batch_pb_->DebugString())); - } - + pb_util::AppendToString(*entry_batch_pb_, &buffer_); state_ = kEntrySerialized; - return Status::OK(); } void LogEntryBatch::MarkReady() { http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/consensus/log.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h index fce13c4..46c573c 100644 --- a/src/kudu/consensus/log.h +++ b/src/kudu/consensus/log.h @@ -114,8 +114,8 @@ class Log : public RefCountedThreadSafe<Log> { // Asynchronously appends 'entry_batch' to the log. Once the append // completes and is synced, 'callback' will be invoked. - Status AsyncAppend(LogEntryBatch* entry_batch, - const StatusCallback& callback); + void AsyncAppend(LogEntryBatch* entry_batch, + const StatusCallback& callback); // Synchronously append a new entry to the log. // Log does not take ownership of the passed 'entry'. @@ -463,7 +463,7 @@ class LogEntryBatch { gscoped_ptr<LogEntryBatchPB> entry_batch_pb, size_t count); // Serializes contents of the entry to an internal buffer. - Status Serialize(); + void Serialize(); // Sets the callback that will be invoked after the entry is // appended and synced to disk http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/consensus/log_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc index 340531a..8ed9185 100644 --- a/src/kudu/consensus/log_util.cc +++ b/src/kudu/consensus/log_util.cc @@ -725,9 +725,7 @@ Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_head // Then Length-prefixed header. PutFixed32(&buf, new_header.ByteSize()); // Then Serialize the PB. - if (!pb_util::AppendToString(new_header, &buf)) { - return Status::Corruption("unable to encode header"); - } + pb_util::AppendToString(new_header, &buf); RETURN_NOT_OK(writable_file()->Append(Slice(buf))); header_.CopyFrom(new_header); @@ -746,11 +744,7 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer) DCHECK(footer.IsInitialized()) << footer.InitializationErrorString(); faststring buf; - - if (!pb_util::AppendToString(footer, &buf)) { - return Status::Corruption("unable to encode header"); - } - + pb_util::AppendToString(footer, &buf); buf.append(kLogSegmentFooterMagicString); PutFixed32(&buf, footer.ByteSize()); http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/consensus/mt-log-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc index a1bfb87..160dfb9 100644 --- a/src/kudu/consensus/mt-log-test.cc +++ b/src/kudu/consensus/mt-log-test.cc @@ -118,7 +118,7 @@ class MultiThreadedLogTest : public LogTestBase { } // lock_guard scope auto cb = new CustomLatchCallback(&latch, &errors); entry_batch->SetReplicates(batch_replicates); - ASSERT_OK(log_->AsyncAppend(entry_batch, cb->AsStatusCallback())); + log_->AsyncAppend(entry_batch, cb->AsStatusCallback()); } LOG_TIMING(INFO, strings::Substitute("thread $0 waiting to append and sync $1 batches", thread_id, FLAGS_num_batches_per_thread)) { http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/master/sys_catalog.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index 8999b7a..27e1ce8 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -412,18 +412,18 @@ Status SysCatalogTable::Write(const Actions& actions) { RETURN_NOT_OK(SchemaToPB(schema_, req.mutable_schema())); if (actions.table_to_add) { - RETURN_NOT_OK(ReqAddTable(&req, actions.table_to_add)); + ReqAddTable(&req, actions.table_to_add); } if (actions.table_to_update) { - RETURN_NOT_OK(ReqUpdateTable(&req, actions.table_to_update)); + ReqUpdateTable(&req, actions.table_to_update); } if (actions.table_to_delete) { - RETURN_NOT_OK(ReqDeleteTable(&req, actions.table_to_delete)); + ReqDeleteTable(&req, actions.table_to_delete); } - RETURN_NOT_OK(ReqAddTablets(&req, actions.tablets_to_add)); - RETURN_NOT_OK(ReqUpdateTablets(&req, actions.tablets_to_update)); - RETURN_NOT_OK(ReqDeleteTablets(&req, actions.tablets_to_delete)); + ReqAddTablets(&req, actions.tablets_to_add); + ReqUpdateTablets(&req, actions.tablets_to_update); + ReqDeleteTablets(&req, actions.tablets_to_delete); RETURN_NOT_OK(SyncWrite(&req, &resp)); return Status::OK(); @@ -433,12 +433,9 @@ Status SysCatalogTable::Write(const Actions& actions) { // Table related methods // ================================================================== -Status SysCatalogTable::ReqAddTable(WriteRequestPB* req, const TableInfo* table) { +void SysCatalogTable::ReqAddTable(WriteRequestPB* req, const TableInfo* table) { faststring metadata_buf; - if (!pb_util::SerializeToString(table->metadata().dirty().pb, &metadata_buf)) { - return Status::Corruption("Unable to serialize SysCatalogTablesEntryPB for tablet", - table->metadata().dirty().name()); - } + pb_util::SerializeToString(table->metadata().dirty().pb, &metadata_buf); KuduPartialRow row(&schema_); CHECK_OK(row.SetInt8(kSysCatalogTableColType, TABLES_ENTRY)); @@ -446,15 +443,11 @@ Status SysCatalogTable::ReqAddTable(WriteRequestPB* req, const TableInfo* table) CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColMetadata, metadata_buf)); RowOperationsPBEncoder enc(req->mutable_row_operations()); enc.Add(RowOperationsPB::INSERT, row); - return Status::OK(); } -Status SysCatalogTable::ReqUpdateTable(WriteRequestPB* req, const TableInfo* table) { +void SysCatalogTable::ReqUpdateTable(WriteRequestPB* req, const TableInfo* table) { faststring metadata_buf; - if (!pb_util::SerializeToString(table->metadata().dirty().pb, &metadata_buf)) { - return Status::Corruption("Unable to serialize SysCatalogTablesEntryPB for tablet", - table->id()); - } + pb_util::SerializeToString(table->metadata().dirty().pb, &metadata_buf); KuduPartialRow row(&schema_); CHECK_OK(row.SetInt8(kSysCatalogTableColType, TABLES_ENTRY)); @@ -462,16 +455,14 @@ Status SysCatalogTable::ReqUpdateTable(WriteRequestPB* req, const TableInfo* tab CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColMetadata, metadata_buf)); RowOperationsPBEncoder enc(req->mutable_row_operations()); enc.Add(RowOperationsPB::UPDATE, row); - return Status::OK(); } -Status SysCatalogTable::ReqDeleteTable(WriteRequestPB* req, const TableInfo* table) { +void SysCatalogTable::ReqDeleteTable(WriteRequestPB* req, const TableInfo* table) { KuduPartialRow row(&schema_); CHECK_OK(row.SetInt8(kSysCatalogTableColType, TABLES_ENTRY)); CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, table->id())); RowOperationsPBEncoder enc(req->mutable_row_operations()); enc.Add(RowOperationsPB::DELETE, row); - return Status::OK(); } Status SysCatalogTable::VisitTables(TableVisitor* visitor) { @@ -523,48 +514,36 @@ Status SysCatalogTable::VisitTableFromRow(const RowBlockRow& row, // Tablet related methods // ================================================================== -Status SysCatalogTable::ReqAddTablets(WriteRequestPB* req, - const vector<TabletInfo*>& tablets) { +void SysCatalogTable::ReqAddTablets(WriteRequestPB* req, + const vector<TabletInfo*>& tablets) { faststring metadata_buf; KuduPartialRow row(&schema_); RowOperationsPBEncoder enc(req->mutable_row_operations()); for (auto tablet : tablets) { - if (!pb_util::SerializeToString(tablet->metadata().dirty().pb, &metadata_buf)) { - return Status::Corruption("Unable to serialize SysCatalogTabletsEntryPB for tablet", - tablet->tablet_id()); - } - + pb_util::SerializeToString(tablet->metadata().dirty().pb, &metadata_buf); CHECK_OK(row.SetInt8(kSysCatalogTableColType, TABLETS_ENTRY)); CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, tablet->tablet_id())); CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColMetadata, metadata_buf)); enc.Add(RowOperationsPB::INSERT, row); } - - return Status::OK(); } -Status SysCatalogTable::ReqUpdateTablets(WriteRequestPB* req, - const vector<TabletInfo*>& tablets) { +void SysCatalogTable::ReqUpdateTablets(WriteRequestPB* req, + const vector<TabletInfo*>& tablets) { faststring metadata_buf; KuduPartialRow row(&schema_); RowOperationsPBEncoder enc(req->mutable_row_operations()); for (auto tablet : tablets) { - if (!pb_util::SerializeToString(tablet->metadata().dirty().pb, &metadata_buf)) { - return Status::Corruption("Unable to serialize SysCatalogTabletsEntryPB for tablet", - tablet->tablet_id()); - } - + pb_util::SerializeToString(tablet->metadata().dirty().pb, &metadata_buf); CHECK_OK(row.SetInt8(kSysCatalogTableColType, TABLETS_ENTRY)); CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, tablet->tablet_id())); CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColMetadata, metadata_buf)); enc.Add(RowOperationsPB::UPDATE, row); } - - return Status::OK(); } -Status SysCatalogTable::ReqDeleteTablets(WriteRequestPB* req, - const vector<TabletInfo*>& tablets) { +void SysCatalogTable::ReqDeleteTablets(WriteRequestPB* req, + const vector<TabletInfo*>& tablets) { KuduPartialRow row(&schema_); RowOperationsPBEncoder enc(req->mutable_row_operations()); for (auto tablet : tablets) { @@ -572,8 +551,6 @@ Status SysCatalogTable::ReqDeleteTablets(WriteRequestPB* req, CHECK_OK(row.SetStringNoCopy(kSysCatalogTableColId, tablet->tablet_id())); enc.Add(RowOperationsPB::DELETE, row); } - - return Status::OK(); } Status SysCatalogTable::VisitTabletFromRow(const RowBlockRow& row, TabletVisitor *visitor) { http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/master/sys_catalog.h ---------------------------------------------------------------------- diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h index ff11ca3..e4702ed 100644 --- a/src/kudu/master/sys_catalog.h +++ b/src/kudu/master/sys_catalog.h @@ -176,15 +176,15 @@ class SysCatalogTable { void InitLocalRaftPeerPB(); // Add an operation to a write adding/updating/deleting a table or tablet. - Status ReqAddTable(tserver::WriteRequestPB* req, const TableInfo* table); - Status ReqUpdateTable(tserver::WriteRequestPB* req, const TableInfo* table); - Status ReqDeleteTable(tserver::WriteRequestPB* req, const TableInfo* table); - Status ReqAddTablets(tserver::WriteRequestPB* req, - const std::vector<TabletInfo*>& tablets); - Status ReqUpdateTablets(tserver::WriteRequestPB* req, - const std::vector<TabletInfo*>& tablets); - Status ReqDeleteTablets(tserver::WriteRequestPB* req, - const std::vector<TabletInfo*>& tablets); + void ReqAddTable(tserver::WriteRequestPB* req, const TableInfo* table); + void ReqUpdateTable(tserver::WriteRequestPB* req, const TableInfo* table); + void ReqDeleteTable(tserver::WriteRequestPB* req, const TableInfo* table); + void ReqAddTablets(tserver::WriteRequestPB* req, + const std::vector<TabletInfo*>& tablets); + void ReqUpdateTablets(tserver::WriteRequestPB* req, + const std::vector<TabletInfo*>& tablets); + void ReqDeleteTablets(tserver::WriteRequestPB* req, + const std::vector<TabletInfo*>& tablets); // Special string injected into SyncWrite() random failures (if enabled). // http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/tablet/delta_compaction-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_compaction-test.cc b/src/kudu/tablet/delta_compaction-test.cc index 78721c0..d0795cc 100644 --- a/src/kudu/tablet/delta_compaction-test.cc +++ b/src/kudu/tablet/delta_compaction-test.cc @@ -173,7 +173,7 @@ TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) { row_id++; } - ASSERT_OK(dfw->WriteDeltaStats(stats)); + dfw->WriteDeltaStats(stats); ASSERT_OK(dfw->Finish()); shared_ptr<DeltaFileReader> dfr; ASSERT_OK(GetDeltaFileReader(block_id, &dfr)); http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/tablet/delta_compaction.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc index 59a6da2..2c753c7 100644 --- a/src/kudu/tablet/delta_compaction.cc +++ b/src/kudu/tablet/delta_compaction.cc @@ -218,12 +218,12 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas() { RETURN_NOT_OK(base_data_writer_->Finish()); if (redo_delta_mutations_written_ > 0) { - RETURN_NOT_OK(new_redo_delta_writer_->WriteDeltaStats(redo_stats)); + new_redo_delta_writer_->WriteDeltaStats(redo_stats); RETURN_NOT_OK(new_redo_delta_writer_->Finish()); } if (undo_delta_mutations_written_ > 0) { - RETURN_NOT_OK(new_undo_delta_writer_->WriteDeltaStats(undo_stats)); + new_undo_delta_writer_->WriteDeltaStats(undo_stats); RETURN_NOT_OK(new_undo_delta_writer_->Finish()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/tablet/delta_store.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc index 029015c..75b591c 100644 --- a/src/kudu/tablet/delta_store.cc +++ b/src/kudu/tablet/delta_store.cc @@ -118,7 +118,7 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter, i += n; } - RETURN_NOT_OK(out->WriteDeltaStats(stats)); + out->WriteDeltaStats(stats); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/tablet/deltafile-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc index 96938b0..8f82b37 100644 --- a/src/kudu/tablet/deltafile-test.cc +++ b/src/kudu/tablet/deltafile-test.cc @@ -92,7 +92,7 @@ class TestDeltaFile : public KuduTest { ASSERT_OK_FAST(stats.UpdateStats(key.timestamp(), rcl)); } } - ASSERT_OK(dfw.WriteDeltaStats(stats)); + dfw.WriteDeltaStats(stats); ASSERT_OK(dfw.Finish()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/tablet/deltafile.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc index 8f36392..255c4e8 100644 --- a/src/kudu/tablet/deltafile.cc +++ b/src/kudu/tablet/deltafile.cc @@ -155,17 +155,13 @@ Status DeltaFileWriter::AppendDelta<UNDO>( return DoAppendDelta(key, delta); } -Status DeltaFileWriter::WriteDeltaStats(const DeltaStats& stats) { +void DeltaFileWriter::WriteDeltaStats(const DeltaStats& stats) { DeltaStatsPB delta_stats_pb; stats.ToPB(&delta_stats_pb); faststring buf; - if (!pb_util::SerializeToString(delta_stats_pb, &buf)) { - return Status::IOError("Unable to serialize DeltaStatsPB", delta_stats_pb.DebugString()); - } - + pb_util::SerializeToString(delta_stats_pb, &buf); writer_->AddMetadataPair(DeltaFileReader::kDeltaStatsEntryName, buf.ToString()); - return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/tablet/deltafile.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h index b7951ff..ea040d7 100644 --- a/src/kudu/tablet/deltafile.h +++ b/src/kudu/tablet/deltafile.h @@ -80,7 +80,7 @@ class DeltaFileWriter { template<DeltaType Type> Status AppendDelta(const DeltaKey &key, const RowChangeList &delta); - Status WriteDeltaStats(const DeltaStats& stats); + void WriteDeltaStats(const DeltaStats& stats); private: Status DoAppendDelta(const DeltaKey &key, const RowChangeList &delta); http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/tablet/deltamemstore.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc index 103ba29..de58c11 100644 --- a/src/kudu/tablet/deltamemstore.cc +++ b/src/kudu/tablet/deltamemstore.cc @@ -125,7 +125,7 @@ Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw, stats->UpdateStats(key.timestamp(), rcl); iter->Next(); } - RETURN_NOT_OK(dfw->WriteDeltaStats(*stats)); + dfw->WriteDeltaStats(*stats); stats_ret->swap(stats); return Status::OK(); http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/util/pb_util-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/pb_util-internal.h b/src/kudu/util/pb_util-internal.h index 35edf4e..b4fac20 100644 --- a/src/kudu/util/pb_util-internal.h +++ b/src/kudu/util/pb_util-internal.h @@ -56,6 +56,10 @@ class SequentialFileFileInputStream : public google::protobuf::io::ZeroCopyInput return total_read_; } + Status status() const { + return status_; + } + private: static const size_t kDefaultBufferSize = 8192; http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/util/pb_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc index 5180153..c31203b 100644 --- a/src/kudu/util/pb_util.cc +++ b/src/kudu/util/pb_util.cc @@ -412,33 +412,41 @@ Status ReadSupplementalHeader(ReadableFileType* reader, int version, uint64_t* o } // anonymous namespace -bool AppendToString(const MessageLite &msg, faststring *output) { +void AppendToString(const MessageLite &msg, faststring *output) { DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); - return AppendPartialToString(msg, output); + AppendPartialToString(msg, output); } -bool AppendPartialToString(const MessageLite &msg, faststring* output) { - int old_size = output->size(); +void AppendPartialToString(const MessageLite &msg, faststring* output) { + size_t old_size = output->size(); int byte_size = msg.ByteSize(); + // Messages >2G cannot be serialized due to overflow computing ByteSize. + DCHECK_GE(byte_size, 0) << "Error computing ByteSize"; - output->resize(old_size + byte_size); + output->resize(old_size + static_cast<size_t>(byte_size)); uint8* start = &((*output)[old_size]); uint8* end = msg.SerializeWithCachedSizesToArray(start); if (end - start != byte_size) { ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start); } - return true; } -bool SerializeToString(const MessageLite &msg, faststring *output) { +void SerializeToString(const MessageLite &msg, faststring *output) { output->clear(); - return AppendToString(msg, output); + AppendToString(msg, output); } -bool ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) { +Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) { SequentialFileFileInputStream istream(rfile); - return msg->ParseFromZeroCopyStream(&istream); + if (!msg->ParseFromZeroCopyStream(&istream)) { + RETURN_NOT_OK(istream.status()); + + // If it's not a file IO error then it's a parsing error. + // Probably, we read wrong or damaged data here. + return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg)); + } + return Status::OK(); } Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) { @@ -479,9 +487,7 @@ Status WritePBToPath(Env* env, const std::string& path, Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) { shared_ptr<SequentialFile> rfile; RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile)); - if (!ParseFromSequentialFile(msg, rfile.get())) { - return Status::IOError("Unable to parse PB from path", path); - } + RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get())); return Status::OK(); } @@ -651,7 +657,8 @@ Status WritablePBContainerFile::Close() { Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) { DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); int data_len = msg.ByteSize(); - DCHECK_GE(data_len, 0); // ByteSize() returns an int, but it should never be negative. + // Messages >2G cannot be serialized due to overflow computing ByteSize. + DCHECK_GE(data_len, 0) << "Error computing ByteSize"; uint64_t record_buflen = sizeof(uint32_t) + data_len + sizeof(uint32_t); if (version_ >= 2) { record_buflen += sizeof(uint32_t); // Additional checksum just for the length. http://git-wip-us.apache.org/repos/asf/kudu/blob/5a46e607/src/kudu/util/pb_util.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h index e59a930..48702fb 100644 --- a/src/kudu/util/pb_util.h +++ b/src/kudu/util/pb_util.h @@ -71,17 +71,16 @@ enum class FileState { extern const int kPBContainerMinimumValidLength; // See MessageLite::AppendToString -bool AppendToString(const MessageLite &msg, faststring *output); +void AppendToString(const MessageLite &msg, faststring *output); // See MessageLite::AppendPartialToString -bool AppendPartialToString(const MessageLite &msg, faststring *output); +void AppendPartialToString(const MessageLite &msg, faststring *output); // See MessageLite::SerializeToString. -bool SerializeToString(const MessageLite &msg, faststring *output); +void SerializeToString(const MessageLite &msg, faststring *output); // See MessageLite::ParseFromZeroCopyStream -// TODO: change this to return Status - differentiate IO error from bad PB -bool ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile); +Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile); // Similar to MessageLite::ParseFromArray, with the difference that it returns // Status::Corruption() if the message could not be parsed.
