This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1ad8c1f18a35e06d4aafd7813de71ee335c68ea1 Author: Alexey Serbin <[email protected]> AuthorDate: Fri Sep 9 19:14:02 2022 -0700 [cfile] unsorted updates on cfile_reader.{h,cc} This patch makes the code in cfile_reader.{h,cc} a bit more robust: * check more rigorously for invariants to protect against unsigned integer underflow, etc. * don't crash but instead return Status::Corruption() when possible * optimized the check on whether to verify checksums * added 'final' to CFileIterator and DefaultColumnValueIterator classes to provide more de-virtualisation opportunities for compiler * added PREDICT_{FALSE,TRUE} where appropriate * added the 'runtime' tag for the --cfile_verify_checksums flag * other minor updates Change-Id: I25507963f87e08a6c3a8ba2ff1ca58836b713e18 Reviewed-on: http://gerrit.cloudera.org:8080/18965 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> Reviewed-by: Yifan Zhang <[email protected]> --- src/kudu/cfile/cfile_reader.cc | 144 ++++++++++++++++++++++++----------------- src/kudu/cfile/cfile_reader.h | 28 +++++--- 2 files changed, 101 insertions(+), 71 deletions(-) diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc index bbd8779dd..5f1968da5 100644 --- a/src/kudu/cfile/cfile_reader.cc +++ b/src/kudu/cfile/cfile_reader.cc @@ -78,6 +78,7 @@ TAG_FLAG(cfile_lazy_open, hidden); DEFINE_bool(cfile_verify_checksums, true, "Verify the checksum for each block on read if one exists"); TAG_FLAG(cfile_verify_checksums, evolving); +TAG_FLAG(cfile_verify_checksums, runtime); DEFINE_double(cfile_inject_corruption, 0, "Fraction of the time that read operations on CFiles will fail " @@ -108,8 +109,8 @@ static const size_t kMaxHeaderFooterPBSize = 64*1024; static Status ParseMagicAndLength(const Slice& data, uint8_t* cfile_version, uint32_t* parsed_len) { - if (data.size() != kMagicAndLengthSize) { - return Status::Corruption("Bad size data"); + if (PREDICT_FALSE(data.size() != kMagicAndLengthSize)) { + return Status::Corruption("invalid data size", to_string(data.size())); } uint8_t version; @@ -122,7 +123,7 @@ static Status ParseMagicAndLength(const Slice& data, } uint32_t len = DecodeFixed32(data.data() + kMagicLength); - if (len > kMaxHeaderFooterPBSize) { + if (PREDICT_FALSE(len > kMaxHeaderFooterPBSize)) { return Status::Corruption("invalid data size for header", to_string(len)); } @@ -138,6 +139,7 @@ CFileReader::CFileReader(ReaderOptions options, block_(std::move(block)), file_size_(file_size), codec_(nullptr), + do_verify_checksum_(false), mem_consumption_(std::move(options.parent_mem_tracker), memory_footprint()) { } @@ -228,12 +230,11 @@ Status CFileReader::ReadAndParseHeader() { "failed to parse CFile pre-header"); // Quick check to ensure the header size is reasonable. - if (header_size >= file_size_ - kMagicAndLengthSize) { + if (PREDICT_FALSE(file_size_ <= header_size + kMagicAndLengthSize)) { return Status::Corruption("invalid CFile header size", to_string(header_size)); } // Setup the data slices. - uint64_t off = kMagicAndLengthSize; uint8_t header_scratch[header_size]; Slice header(header_scratch, header_size); uint8_t checksum_scratch[kChecksumSize]; @@ -241,23 +242,22 @@ Status CFileReader::ReadAndParseHeader() { // Read the header and checksum if needed. vector<Slice> results = { header }; - if (has_checksums() && FLAGS_cfile_verify_checksums) { + if (do_verify_checksum()) { results.push_back(checksum); } - RETURN_NOT_OK(block_->ReadV(off, results)); + RETURN_NOT_OK(block_->ReadV(kMagicAndLengthSize, results)); - if (has_checksums() && FLAGS_cfile_verify_checksums) { - Slice slices[] = { mal, header }; + if (do_verify_checksum()) { + Slice slices[]{ mal, header }; RETURN_NOT_OK(VerifyChecksum(slices, checksum)); } // Parse the protobuf header. header_.reset(new CFileHeaderPB()); - if (!header_->ParseFromArray(header.data(), header.size())) { + if (PREDICT_FALSE(!header_->ParseFromArray(header.data(), header.size()))) { return Status::Corruption("invalid CFile pb header", header.ToDebugString()); } - VLOG(2) << "Read header: " << SecureDebugString(*header_); return Status::OK(); @@ -267,7 +267,10 @@ Status CFileReader::ReadAndParseFooter() { TRACE_EVENT1("io", "CFileReader::ReadAndParseFooter", "cfile", ToString()); DCHECK(!init_once_.init_succeeded()); - CHECK_GT(file_size_, kMagicAndLengthSize) << "file too short: " << file_size_; + + if (PREDICT_FALSE(file_size_ <= kMagicAndLengthSize)) { + return Status::Corruption(Substitute("file too short: $0", file_size_)); + } // First read and parse the "post-footer", which has magic // and the length of the actual protobuf footer. @@ -278,7 +281,7 @@ Status CFileReader::ReadAndParseFooter() { RETURN_NOT_OK(ParseMagicAndLength(mal, &cfile_version_, &footer_size)); // Quick check to ensure the footer size is reasonable. - if (footer_size >= file_size_ - kMagicAndLengthSize) { + if (PREDICT_FALSE(file_size_ <= footer_size + kMagicAndLengthSize)) { return Status::Corruption(Substitute( "invalid CFile footer size $0 in block of size $1", footer_size, file_size_)); @@ -294,21 +297,31 @@ Status CFileReader::ReadAndParseFooter() { // We read the checksum position in case one exists. // This is done to avoid the need for a follow up read call. Slice results[2] = {checksum, footer}; - uint64_t off = file_size_ - kMagicAndLengthSize - footer_size - kChecksumSize; - RETURN_NOT_OK(block_->ReadV(off, results)); + if (PREDICT_FALSE(file_size_ < + kMagicAndLengthSize + footer_size + kChecksumSize)) { + return Status::Corruption(Substitute( + "unexpected CFile contents: total size $0, footer size $1", + file_size_, footer_size)); + } + RETURN_NOT_OK(block_->ReadV( + file_size_ - kMagicAndLengthSize - footer_size - kChecksumSize, results)); // Parse the protobuf footer. // This needs to be done before validating the checksum since the // incompatible_features flag tells us if a checksum exists at all. footer_.reset(new CFileFooterPB()); - if (!footer_->ParseFromArray(footer.data(), footer.size())) { + if (PREDICT_FALSE(!footer_->ParseFromArray(footer.data(), footer.size()))) { return Status::Corruption("invalid CFile pb footer", footer.ToDebugString()); } + // Remember the checksum verification choice. + do_verify_checksum_ = + PREDICT_TRUE(FLAGS_cfile_verify_checksums) && has_checksum(); + // Verify the footer checksum if needed. - if (has_checksums() && FLAGS_cfile_verify_checksums) { + if (do_verify_checksum()) { // If a checksum exists it was pre-read. - Slice slices[2] = {footer, mal}; + Slice slices[]{ footer, mal }; RETURN_NOT_OK(VerifyChecksum(slices, checksum)); } @@ -317,26 +330,21 @@ Status CFileReader::ReadAndParseFooter() { RETURN_NOT_OK_PREPEND(GetCompressionCodec(footer_->compression(), &codec_), "failed to load CFile compression codec"); } - VLOG(2) << "Read footer: " << SecureDebugString(*footer_); return Status::OK(); } -bool CFileReader::has_checksums() const { - return footer_->incompatible_features() & IncompatibleFeatures::CHECKSUM; -} - -Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) const { +Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) { uint32_t expected_checksum = DecodeFixed32(checksum.data()); uint32_t checksum_value = 0; - for (auto& d : data) { + for (const auto& d : data) { checksum_value = crc::Crc32c(d.data(), d.size(), checksum_value); } if (PREDICT_FALSE(checksum_value != expected_checksum || MaybeTrue(FLAGS_cfile_inject_corruption))) { return Status::Corruption( - Substitute("Checksum does not match: $0 vs expected $1", + Substitute("checksum does not match: $0 vs expected $1", checksum_value, expected_checksum)); } return Status::OK(); @@ -361,7 +369,9 @@ class ScratchMemory { public: ScratchMemory() : ptr_(nullptr), size_(-1) {} ~ScratchMemory() { - if (!ptr_) return; + if (!ptr_) { + return; + } if (!from_cache_.valid()) { delete[] ptr_; } @@ -372,14 +382,13 @@ class ScratchMemory { // to allocating from the heap. In that case, IsFromCache() will // return false. void TryAllocateFromCache(BlockCache* cache, const BlockCache::CacheKey& key, int size) { + DCHECK(!from_cache_.valid()); DCHECK(!ptr_); from_cache_ = cache->Allocate(key, size); if (!from_cache_.valid()) { - AllocateFromHeap(size); - return; - } else { - ptr_ = from_cache_.val_ptr(); + return AllocateFromHeap(size); } + ptr_ = from_cache_.val_ptr(); size_ = size; } @@ -434,8 +443,13 @@ Status CFileReader::ReadBlock(const IOContext* io_context, CacheControl cache_control, scoped_refptr<BlockHandle>* ret) const { DCHECK(init_once_.init_succeeded()); - CHECK(ptr.offset() > 0 && ptr.offset() + ptr.size() < file_size_) - << Substitute("bad offset $0 in file of size $1", ptr.ToString(), file_size_); + + if (PREDICT_FALSE(ptr.offset() == 0 || + file_size_ <= ptr.offset() + ptr.size())) { + return Status::Corruption(Substitute( + "bad offset $0 in file of size $1", ptr.ToString(), file_size_)); + } + BlockCacheHandle bc_handle; Cache::CacheBehavior cache_behavior = cache_control == CACHE_BLOCK ? Cache::EXPECT_IN_CACHE : Cache::NO_EXPECT_IN_CACHE; @@ -459,8 +473,8 @@ Status CFileReader::ReadBlock(const IOContext* io_context, TRACE_COUNTER_INCREMENT(CFILE_CACHE_MISS_BYTES_METRIC_NAME, ptr.size()); uint32_t data_size = ptr.size(); - if (has_checksums()) { - if (PREDICT_FALSE(kChecksumSize > data_size)) { + if (has_checksum()) { + if (PREDICT_FALSE(data_size < kChecksumSize)) { return Status::Corruption("invalid data size for block pointer", ptr.ToString()); } @@ -483,15 +497,14 @@ Status CFileReader::ReadBlock(const IOContext* io_context, // Read the data and checksum if needed. Slice results_backing[] = { block, checksum }; - bool read_checksum = has_checksums() && FLAGS_cfile_verify_checksums; - ArrayView<Slice> results(results_backing, read_checksum ? 2 : 1); + ArrayView<Slice> results(results_backing, do_verify_checksum() ? 2 : 1); RETURN_NOT_OK_PREPEND(block_->ReadV(ptr.offset(), results), Substitute("failed to read CFile block $0 at $1", block_id().ToString(), ptr.ToString())); - if (has_checksums() && FLAGS_cfile_verify_checksums) { - Status s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum); - if (!s.ok()) { + if (do_verify_checksum()) { + if (auto s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum); + PREDICT_FALSE(!s.ok())) { RETURN_NOT_OK_HANDLE_CORRUPTION( s.CloneAndPrepend(Substitute("checksum error on CFile block $0 at $1", block_id().ToString(), ptr.ToString())), @@ -503,11 +516,10 @@ Status CFileReader::ReadBlock(const IOContext* io_context, if (codec_ != nullptr) { // Init the decompressor and get the size required for the uncompressed buffer. CompressedBlockDecoder uncompressor(codec_, cfile_version_, block); - Status s = uncompressor.Init(); - if (!s.ok()) { - LOG(WARNING) << "Unable to validate compressed block " << block_id().ToString() - << " at " << ptr.offset() << " of size " << block.size() << ": " - << s.ToString(); + if (auto s = uncompressor.Init(); PREDICT_FALSE(!s.ok())) { + LOG(WARNING) << Substitute( + "unable to validate compressed block $0 of size $1 at offset $2: $3", + block_id().ToString(), block.size(), ptr.offset(), s.ToString()); return s; } int uncompressed_size = uncompressor.uncompressed_size(); @@ -520,11 +532,11 @@ Status CFileReader::ReadBlock(const IOContext* io_context, } else { decompressed_scratch.AllocateFromHeap(uncompressed_size); } - s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get()); - if (!s.ok()) { - LOG(WARNING) << "Unable to uncompress block " << block_id().ToString() - << " at " << ptr.offset() - << " of size " << block.size() << ": " << s.ToString(); + if (auto s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get()); + PREDICT_FALSE(!s.ok())) { + LOG(WARNING) << Substitute( + "unable to uncompress block $0 of size $1 at offset $2: $3", + block_id().ToString(), block.size(), ptr.offset(), s.ToString()); return s; } @@ -582,13 +594,6 @@ bool CFileReader::GetMetadataEntry(const string& key, string* val) const { return false; } -void CFileReader::HandleCorruption(const fs::IOContext* io_context) const { - DCHECK(io_context); - LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString(); - block_->block_manager()->error_manager()->RunErrorNotificationCb( - ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id); -} - Status CFileReader::NewIterator(unique_ptr<CFileIterator>* iter, CacheControl cache_control, const IOContext* io_context) { @@ -596,6 +601,16 @@ Status CFileReader::NewIterator(unique_ptr<CFileIterator>* iter, return Status::OK(); } +bool CFileReader::has_checksum() const { + DCHECK(footer_); + return footer_->incompatible_features() & IncompatibleFeatures::CHECKSUM; +} + +bool CFileReader::do_verify_checksum() const { + DCHECK(footer_); + return do_verify_checksum_; +} + size_t CFileReader::memory_footprint() const { size_t size = kudu_malloc_usable_size(this); size += block_->memory_footprint(); @@ -613,6 +628,13 @@ size_t CFileReader::memory_footprint() const { return size; } +void CFileReader::HandleCorruption(const fs::IOContext* io_context) const { + DCHECK(io_context); + LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString(); + block_->block_manager()->error_manager()->RunErrorNotificationCb( + ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id); +} + //////////////////////////////////////////////////////////// // Default Column Value Iterator //////////////////////////////////////////////////////////// @@ -924,12 +946,12 @@ Status DecodeNullInfo(scoped_refptr<BlockHandle>* data_block_handle, uint32_t* num_rows_in_block, Slice* non_null_bitmap) { Slice data_block = (*data_block_handle)->data(); - if (!GetVarint32(&data_block, num_rows_in_block)) { + if (PREDICT_FALSE(!GetVarint32(&data_block, num_rows_in_block))) { return Status::Corruption("bad null header, num elements in block"); } uint32_t non_null_bitmap_size; - if (!GetVarint32(&data_block, &non_null_bitmap_size)) { + if (PREDICT_FALSE(!GetVarint32(&data_block, &non_null_bitmap_size))) { return Status::Corruption("bad null header, bitmap size"); } @@ -1012,10 +1034,10 @@ Status CFileIterator::PrepareBatch(size_t* n) { // prepared_blocks_ queue. while (prepared_blocks_.back()->last_row_idx() < end_idx) { Status s = seeked_->Next(); - if (PREDICT_FALSE(s.IsNotFound())) { + if (s.IsNotFound()) { VLOG(1) << "Reached EOF"; break; - } else if (!s.ok()) { + } else if (PREDICT_FALSE(!s.ok())) { return s; } RETURN_NOT_OK(QueueCurrentDataBlock(*seeked_)); diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h index f0f406c58..2f05ca4b1 100644 --- a/src/kudu/cfile/cfile_reader.h +++ b/src/kudu/cfile/cfile_reader.h @@ -176,19 +176,15 @@ class CFileReader { return BlockPointer(footer().validx_info().root_block()); } - // Returns true if the file has checksums on the header, footer, and data blocks. - bool has_checksums() const; - // Can be called before Init(). std::string ToString() const { return block_->id().ToString(); } - // Handles a corruption error. Functions that may return due to a CFile - // corruption should call this method before returning. - void HandleCorruption(const fs::IOContext* io_context) const; - private: DISALLOW_COPY_AND_ASSIGN(CFileReader); + static Status VerifyChecksum(ArrayView<const Slice> data, + const Slice& checksum); + CFileReader(ReaderOptions options, uint64_t file_size, std::unique_ptr<fs::ReadableBlock> block); @@ -198,11 +194,21 @@ class CFileReader { Status ReadAndParseHeader(); Status ReadAndParseFooter(); - Status VerifyChecksum(ArrayView<const Slice> data, const Slice& checksum) const; + + // Return true if the file has checksum on the header, footer, and data blocks. + bool has_checksum() const; + + // Return true if has_checksum() returns true and the checksum verification + // is requested. + bool do_verify_checksum() const; // Returns the memory usage of the object including the object itself. size_t memory_footprint() const; + // Handles a corruption error. Functions that may return due to a CFile + // corruption should call this method before returning. + void HandleCorruption(const fs::IOContext* io_context) const; + const std::unique_ptr<fs::ReadableBlock> block_; const uint64_t file_size_; @@ -214,6 +220,8 @@ class CFileReader { const TypeInfo* type_info_; const TypeEncodingInfo* type_encoding_info_; + bool do_verify_checksum_; + KuduOnceLambda init_once_; ScopedTrackedConsumption mem_consumption_; @@ -282,7 +290,7 @@ class ColumnIterator { // Example: // DefaultColumnValueIterator iter; // iter.Scan(&column_block); -class DefaultColumnValueIterator : public ColumnIterator { +class DefaultColumnValueIterator final : public ColumnIterator { public: DefaultColumnValueIterator(const TypeInfo* typeinfo, const void* value) : typeinfo_(typeinfo), @@ -312,7 +320,7 @@ class DefaultColumnValueIterator : public ColumnIterator { }; -class CFileIterator : public ColumnIterator { +class CFileIterator final : public ColumnIterator { public: CFileIterator(CFileReader* reader, CFileReader::CacheControl cache_control,
